How to subscribe to additional channels in a running websocket connection?

I’m having a bit of trouble integrating the Alpaca asyncio-based streaming API into a larger app that doesn’t use asyncio. I would like to encapsulate the API into a thread and use it as follows:

# start the alpaca streaming API thread
t = AlpacaStreamThread()
t.start()
time.sleep(10)
t.subscribe(['T.MSFT', 'AM.MSFT']) # user-triggered
time.sleep(10)
t.unsubscribe(['T.MSFT', 'AM.MSFT'])  # user-triggered
time.sleep(10) # etc etc

My AlpacaStreamThread implementation is below. Everything works great if I subscribe to the symbols at the beginning using the StreamConn.run “initial_channel” args. However I only want to subscribe to ‘trade_updates’ and ‘account_updates’ updates at the start. I want to add or remove market data subscriptions later on.

Can someone help me fix the below subscribe/unsubscribe methods? They should be non-async methods, unlike the StreamConn’s internal subscribe/unsubscribe. I’ve tried a lot of (increasing ugly) ways to get this to work but I’m no expert on asyncio and would appreciate any help.

import asyncio
import time
from threading import Thread
from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL

class AlpacaStreamThread(Thread):
    # handler for all incoming streaming messages
    async def on_data(self, conn, channel, data):
        print('channel', channel)
        print('data', data)

    # subscribes to additional channels - DOESN'T WORK
    def subscribe(self, channels):
        ???

    # unsubscribes to existing channels - DOESN'T WORK
    def unsubscribe(self, channels):
        ???

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.conn = StreamConn(
            key_id='XXX',
            secret_key='XXX',
            base_url=URL('wss://api.alpaca.markets'),
            data_url=URL('wss://data.alpaca.markets'),
            # use the polygon consolidated feed
            data_stream='polygon')
        self.conn.register(r'.*', self.on_data)
        self.conn.run(['trade_updates', 'account_updates'])

I believe I’ve figured out a reasonably clean way to do this, but I’m somewhat unhappy with it because it requires a 3rd party queue (janus) to interface between the synchronous and asynchronous routines. Here is the typical usage again, which now works great:

# start the alpaca streaming API thread
t = AlpacaStreamThread()
t.start()
time.sleep(10)
t.subscribe(['T.MSFT', 'AM.MSFT'])
time.sleep(10)
t.unsubscribe(['T.MSFT', 'AM.MSFT'])
time.sleep(30)

Any critique (especially pitfalls) of the below implementation is welcome. Hopefully the code helps others that might be struggling with this. Single-threaded asyncio can be a real pain to integrate with ‘traditionally’ threaded code!

import asyncio
import janus
import time
from threading import Thread
from alpaca_trade_api import StreamConn
from alpaca_trade_api.common import URL

class AlpacaStreamThread(Thread):
    # handler for all incoming streaming messages
    async def on_data(self, conn, channel, data):
        print('channel', channel)
        print('data', data)

    # monitors any synchronous subscribe/unsubscribe requests
    async def monitor_subs(self, async_q):
        while True:
            s_type, channels = await async_q.get()
            if s_type == 'S':
                await self.conn.subscribe(channels)
            elif s_type == 'U':
                await self.conn.unsubscribe(channels)

    # initialize janus async-aware queue
    async def init_queue(self):
        # queue must be initialized in running event loop
        self.subs_queue = janus.Queue()

    # subscribes to additional channels (synchronous)
    def subscribe(self, channels):
        if isinstance(channels, str):
            channels = [channels]
        self.subs_queue.sync_q.put(('S', channels))

    # unsubscribes from existing channels (synchronous)
    def unsubscribe(self, channels):
        if isinstance(channels, str):
            channels = [channels]
        self.subs_queue.sync_q.put(('U', channels))

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.conn = StreamConn(
            key_id='xxx',
            secret_key='xxx',
            base_url=URL('wss://api.alpaca.markets'),
            data_url=URL('wss://data.alpaca.markets'),
            # use the polygon consolidated feed
            data_stream='polygon')
        self.conn.register(r'.*', self.on_data)

        initial_subs = ['trade_updates', 'account_updates']
        init_cors = asyncio.gather(
            self.conn.subscribe(initial_subs),
            self.init_queue())

        # run async initialization of account subscriptions
        # and janus async-aware queue
        self.loop.run_until_complete(init_cors)

        # receive asynchronous webservice response messages
        # and wait on the subscription request queue simultaneously
        async_tasks = asyncio.gather(
            self.monitor_subs(self.subs_queue.async_q),
            self.conn.consume())
        self.loop.run_until_complete(async_tasks)