I’m having a bit of trouble integrating the Alpaca asyncio-based streaming API into a larger app that doesn’t use asyncio. I would like to encapsulate the API into a thread and use it as follows:
# start the alpaca streaming API thread
t = AlpacaStreamThread()
t.start()
time.sleep(10)
t.subscribe(['T.MSFT', 'AM.MSFT']) # user-triggered
time.sleep(10)
t.unsubscribe(['T.MSFT', 'AM.MSFT']) # user-triggered
time.sleep(10) # etc etc
My AlpacaStreamThread implementation is below. Everything works great if I subscribe to the symbols at the beginning using the StreamConn.run “initial_channel” args. However I only want to subscribe to ‘trade_updates’ and ‘account_updates’ updates at the start. I want to add or remove market data subscriptions later on.
Can someone help me fix the below subscribe/unsubscribe methods? They should be non-async methods, unlike the StreamConn’s internal subscribe/unsubscribe. I’ve tried a lot of (increasing ugly) ways to get this to work but I’m no expert on asyncio and would appreciate any help.
import asyncio
import time
from threading import Thread
from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL
class AlpacaStreamThread(Thread):
# handler for all incoming streaming messages
async def on_data(self, conn, channel, data):
print('channel', channel)
print('data', data)
# subscribes to additional channels - DOESN'T WORK
def subscribe(self, channels):
???
# unsubscribes to existing channels - DOESN'T WORK
def unsubscribe(self, channels):
???
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.conn = StreamConn(
key_id='XXX',
secret_key='XXX',
base_url=URL('wss://api.alpaca.markets'),
data_url=URL('wss://data.alpaca.markets'),
# use the polygon consolidated feed
data_stream='polygon')
self.conn.register(r'.*', self.on_data)
self.conn.run(['trade_updates', 'account_updates'])
I believe I’ve figured out a reasonably clean way to do this, but I’m somewhat unhappy with it because it requires a 3rd party queue (janus) to interface between the synchronous and asynchronous routines. Here is the typical usage again, which now works great:
# start the alpaca streaming API thread
t = AlpacaStreamThread()
t.start()
time.sleep(10)
t.subscribe(['T.MSFT', 'AM.MSFT'])
time.sleep(10)
t.unsubscribe(['T.MSFT', 'AM.MSFT'])
time.sleep(30)
Any critique (especially pitfalls) of the below implementation is welcome. Hopefully the code helps others that might be struggling with this. Single-threaded asyncio can be a real pain to integrate with ‘traditionally’ threaded code!
import asyncio
import janus
import time
from threading import Thread
from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL
class AlpacaStreamThread(Thread):
# handler for all incoming streaming messages
async def on_data(self, conn, channel, data):
print('channel', channel)
print('data', data)
# monitors any synchronous subscribe/unsubscribe requests
async def monitor_subs(self, async_q):
while True:
s_type, channels = await async_q.get()
if s_type == 'S':
await self.conn.subscribe(channels)
elif s_type == 'U':
await self.conn.unsubscribe(channels)
# initialize janus async-aware queue
async def init_queue(self):
# queue must be initialized in running event loop
self.subs_queue = janus.Queue()
# subscribes to additional channels (synchronous)
def subscribe(self, channels):
if isinstance(channels, str):
channels = [channels]
self.subs_queue.sync_q.put(('S', channels))
# unsubscribes from existing channels (synchronous)
def unsubscribe(self, channels):
if isinstance(channels, str):
channels = [channels]
self.subs_queue.sync_q.put(('U', channels))
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.conn = StreamConn(
key_id='xxx',
secret_key='xxx',
base_url=URL('wss://api.alpaca.markets'),
data_url=URL('wss://data.alpaca.markets'),
# use the polygon consolidated feed
data_stream='polygon')
self.conn.register(r'.*', self.on_data)
initial_subs = ['trade_updates', 'account_updates']
init_cors = asyncio.gather(
self.conn.subscribe(initial_subs),
self.init_queue())
# run async initialization of account subscriptions
# and janus async-aware queue
self.loop.run_until_complete(init_cors)
# receive asynchronous webservice response messages
# and wait on the subscription request queue simultaneously
async_tasks = asyncio.gather(
self.monitor_subs(self.subs_queue.async_q),
self.conn.consume())
self.loop.run_until_complete(async_tasks)