Polygon bar streaming timing

Happy Saturday everyone!

I just thought to share this today for a quick tips. Do you know when a minute bar is generate? The answer is below.

As you know, you can subscribe to the real time minute bars update from Polygon if you have access. In python this is like:

def main():
    stream = alpaca.StreamConn()
    @stream.on(r'^AM')
    async def on_bars(conn, channel, data):
        print(f'received bar {data}')

    stream.run(['AM.SPY'])

Polygon generates each minute bar 4 seconds after the minute, meaning the bar for 09:30 is pushed around 09:31.04. Why is it not 09:31:00? It’s because a minute bar needs to see all ticks between 09:30:00 and 09:31:00 and correction ticks to reconcile. Generating a bar data from ticks sounds easy, but there are some detailed logics when it comes to US equity markets.

When you do some backtesting, this is good to know and make sure you don’t assume your order can go before 09:31:04 if you are relying on this bar event.

3 Likes

I need to start streaming of tickers on demand, as requested through a GUI over time. How do I subscribe to after stream.run() us called with initial symbols.

Could you post an example for this. I want to code this preferably not knowing much about asyncio.

1 Like

You can call stream.subscribe() with additional symbols from inside the message handler. To remove symbols, you can call stream.unsubscribe()

Thank you Hitoshi. I was specifically looking for code to do that - as there is none in the example pieces.

Here is my answer to this question. After spawning a thread that executes the stream.run() command, you will need to call subscribe like so

asyncio.run(conn.subscribe([‘AM.SPY’, …]))

Another thing I figured is that, in the example code run(['AM. ', …]) subscribes all tickers (1000s of them). You need to specify the symbol with that like so - ‘AM.SPY’

1 Like

I have a follow up question - the run() command (1) connects to Polygon, (2) starts the forever listening loop, (3) subscribes to real time channels.

However, it will do so only if you specify a ticker for at least one channel, say trades ‘T.SPY’, or minute bars ‘AM.SPY’. If run is called without any channel subscription, then it does not connect to Polygon.

In my code, I want to connect to polygon and start listening. But subscribe to tickers later in the code. Is there a connect method that do just that or does the Polygon API need a subscription request at the time of connection.

(As is, I pass an arbitrary ticker to run() so that it connects).

1 Like

It’s this _ensure_ws() method of polygon.StreamConn.

This is how i did it

class AsyncEventLooper():
def init(self):
pass

# at class level
def classasyncioLooper(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

================================================

declare this def in you streaming class

def subscribeSymbols(self, symbol):
channels = self.getChannels(symbol)
if channels is not None:
#self.conn is the steraming connection
self.conn.loop.stop()
while self.conn.loop.is_running():
pass
self.conn.loop.run_until_complete(asyncio.gather(self.conn.subscribe(channels)))
#t need to be declared at global level, other thread will stop after the end of the def
t = threading.Thread(target=AsyncEventLooper.classasyncioLooper, args=(self.conn.loop,))
t.start()