One Minute IntraDay Bars

Hi All,

I just started using the Alpaca V2 Streaming, and I seem to have run up against a stock subscription limit. I can only get about 290 stocks with one minute bars. I want to subscribe to 2K stock symbols.

Alpaca indicates in the documentation, there is no limit on the number of bars subscriptions.

Before I run invoke the event loop via “stream.run”
I go into a loop to subscribe for all the stocks:
total_count = 0
for signal_record in signalCollection.find( query):
my_symbol = signal_record[‘symbol’]
argStream.subscribe_bars(on_bar_message, my_symbol)
print(f"{my_symbol}={total_count}")
total_count += 1
print(f"total records processed {total_count}")

I understand I might be running into a buffer limit. What is the correct method to chuck/batch the subscriptions?

Thanks,
htomxx

Hi @htomxx - Mallory from the Alpaca team here.

What SDK or library are you using, and what error message are you getting? That will help me pinpoint what’s going on here and give you an idea of best practice for what you’re looking to do.

I am using version 2 of the real-time api, and basing my program on the following example.
I no longer subscribe in one loop but batch them according to your example program

I have two issues related to getting minute bars on approx 2000K symbols. I prefer this method of asking what is needed over my second method.

I am using your subscription example located at alpaca-trade-api-python/dynamic_subscription_example.py at master · alpacahq/alpaca-trade-api-python · GitHub

I modified the main loop to subscribe to 200 symbols at a time:

while 1:
total_count = 0
for ema_record in ema_symbols_collection.find({}):
my_symbol = ema_record[‘symbol’]
conn.subscribe_bars(on_bar_message, my_symbol)
ema_symbols_collection.delete_one({’_id’: ema_record[’_id’]})
time.sleep(1)
total_count += 1
if total_count > 200:
break
print(f"total records processed {total_count}")
time.sleep(1)

The main difference between your example, and my program. My program never unsubscribes, it just keeps on adding more symbols. I find the minute data coming back is not consistent…

The second method I used was to subscribe to all symbols using the ‘*’ wildcard. The data coming back is much more consistent (all bars on the same minute). The data is all for the same minute for 2300 symbols. I implemented a hash table of the wanted symbols, and only process interested symbols. This is not the ideal solution, I prefer to have more finer grain control, which I will need guidance from Alpaca support.

Please provide guidance on the two methods above. I am interested the following

1 - Using the incremental subscription method.
What is the guidance on batching the subscriptions?

2 - Using the wildcard subscription method. This seems to work, I would prefer the more controlled subscriptions.

Is there a best practice document that addresses a large number of minute bars? If yes, please forward to me.

I run into this message on ocassion.
2021-06-11 10:56:38,741 WARNING websocket error, restarting connection: code = 1006 (connection closed abnormally [internal]), no reason.
I think this message is related to being in Pycharm too long, and the buffer has been exhausted. Please confirm if this understanding is correct.

Also, If I do encounter the above error message, what is my recovery steps.

Thanks for your help
htomxx.

Hey @htomxx

What I’m going to recommend here is using the * wildcard method, and unsubscribing to the symbols you aren’t interested in after the fact. This will help avoid the 1006 error, especially when it comes to loops subscribing to batches of symbols – if you lose the connection, you’ll have to re-connect and subscribe all over again. Its best to err on the side of caution with the wildcard method, and unsubscribe accordingly.

Hi All,

Thanks Mallory for your input, I really appreciate the level of support here.

I will follow the wildcard subscription method, and unsubscribe any unwanted symbols.

I have more questions w.r.t. Intraday Bars.

In my testing of intraday bars, I am encountering missing bars for stocks with trading activity for the minute. I would like to understand the rules in the creation of intraday bars.

My questions are:
1 - What are the base conditions to generate an intraday bar?
a- I assume if there is no trade activity, there will be no intraday bar sent.
b- If there was trade activity, an intraday bar will be sent.
2 - I am performing a test on intraday one minute bars, and bars that I expect to see are not showing up in the streaming API, please provide guidance on how I can resolve this. I am verifying the market activity by using the ThinkorSwim app.
3- I tried unsubscribing to a list of symbols, and I Key error on the unsubscribed symbol.
I cannot unsubscribe to unwanted bars, when I tried to unsubscribe I get the following traceback
Traceback (most recent call last):
File “/home/htom/.local/share/JetBrains/Toolbox/apps/PyCharm-C/ch-0/202.7660.27/plugins/python-ce/helpers/pydev/pydevd.py”, line 1448, in _exec
pydev_imports.execfile(file, globals, locals) # execute the script
File “/home/htom/.local/share/JetBrains/Toolbox/apps/PyCharm-C/ch-0/202.7660.27/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py”, line 18, in execfile
exec(compile(contents+"\n", file, ‘exec’), glob, loc)
File “/home/htom/PycharmProjects/DEV1POInjector/minuteMinder.py”, line 282, in
conn.unsubscribe_bars(symbol)
File “/home/htom/PycharmProjects/DEV1POInjector/venv/lib/python3.8/site-packages/alpaca_trade_api/stream.py”, line 420, in unsubscribe_bars
self._data_ws.unsubscribe_bars(symbols)
File “/home/htom/PycharmProjects/DEV1POInjector/venv/lib/python3.8/site-packages/alpaca_trade_api/stream.py”, line 177, in unsubscribe_bars
2021-06-14 12:33:14,158 INFO subscribed to trades: [], quotes: [] and bars: [’
’]
del self._bar_handlers[symbol]
KeyError: ‘LITE’ <--------------------- error encountered.

I arrange the code to use a thread safe data-structure. I selected Queue to be the thread-safe data structure.
in my code I have

from queue import Queue
unwanted_symbol = Queue() // unwanted_symbols is a global

async def on_bar_message(message):
global unwanted_symbol

if my_s not in my_symbols:
    unwanted_symbol.put(my_s)   // on bar received, add to unwanted symbol if symbol is unwanted
    return

main event loop

if name == ‘main’:
while 1:
while unwanted_symbol:
symbol = unwanted_symbol.get() # get symbol from unwanted queue
conn.unsubscribe_bars(symbol) # unsubscribe to bar.
time.sleep(60)

It appears I cannot unsubscribed correctly, when I perform a wildcard subscription:
conn.subscribe_bars(on_bar_message, ‘*’)

Please advise on the proper mechanism to unsubscribe.

Many Thanks,
htomxx

Hi Alpaca Support,

I figured out a proper method to subscribe to one minute bars, so I don’t need to use the wild-card. The problem I am encountering is there is a limit of 300 symbols.

How do I subscribe to more then 300 symbols?

Thanks,
Howard

Hey Howard @htomxx

Are you getting a return with a pagination key/page token, or no?

Hi Mallory,

I got the subscription working correctly, and I have one_minute bars on 1100 symbols.

The APIs/wrapper is working very well.

There is no pagination, I am using the streaming market data one minute bars.

I am now running into error 1006, please advise where I can put retry/resubscribe logic in.

Thanks,
Howard

Hi Mallory/Alpaca Support,

My attempt in reconnecting after a 1006 error did not work

It seems the conn.run() never throws an exception, and allow the retry block to execute. The code I have is as follows:

def run_stream(conn):
try:
conn.run()
except Exception as e: # never executes
print(f’Exception from websocket connection: {e}’)
finally: # retry logic,
conn.subscribe_bars(on_bar_message, *stock_tuple)
print(“Trying to re-establish connection”)
time.sleep(3)
run_stream(conn)

main

conn = Stream(config.ALPACA_API_KEY,
              config.ALPACA_SECRET_KEY,
              base_url=URL(config.ALPACA_WEB_SOCKET),
              data_feed='sip')

run_stream(conn)

Please advise on how I can detect a 1006 error, and how I re-subscribe and reopen the stream.

Thanks,
htomxx

Hi Mallory/Alpaca Support,

Here is some more info on the problem. There are two errors 866 and 1006.
The first error log is message is:
2021-06-16 13:37:17,866 ERROR Error in data transfer
and the second error is:
2021-06-16 13:37:18,452 WARNING websocket error, restarting connection: code = 1006 (connection closed abnormally [internal]), no reason

Please find the entire trace below.
2021-06-16 13:37:17,866 ERROR Error in data transfer
Traceback (most recent call last):
File “/home/htom/PycharmProjects/DEV1POInjector/venv/lib/python3.8/site-packages/websockets/protocol.py”, line 827, in transfer_data
message = await self.read_message()
File “/home/htom/PycharmProjects/DEV1POInjector/venv/lib/python3.8/site-packages/websockets/protocol.py”, line 895, in read_message
frame = await self.read_data_frame(max_size=self.max_size)
File “/home/htom/PycharmProjects/DEV1POInjector/venv/lib/python3.8/site-packages/websockets/protocol.py”, line 995, in read_data_frame
await self.pong(frame.data)
File “/home/htom/PycharmProjects/DEV1POInjector/venv/lib/python3.8/site-packages/websockets/protocol.py”, line 766, in pong
await self.ensure_open()
File “/home/htom/PycharmProjects/DEV1POInjector/venv/lib/python3.8/site-packages/websockets/protocol.py”, line 803, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason
2021-06-16 13:37:18,452 WARNING websocket error, restarting connection: code = 1006 (connection closed abnormally [internal]), no reason

Thanks,
htomxx

Hi there,

We are investigating your issues and looking into what’s happening. I’ll keep you posted!

Hi Mallory,

I found a workaround, the problem is in the stream.py in the Alpaca library. I added one line of code to raise the error (please see comment ‘Code added by htomxx’ below). The code was added to the _run_forever method in the DataStream class at line number 226.
except websockets.WebSocketException as wse:
retries += 1
if retries > int(os.environ.get(‘APCA_RETRY_MAX’, 3)):
await self.close()
self._running = False
raise ConnectionError(“max retries exceeded”)
if retries > 1:
await asyncio.sleep(
int(os.environ.get(‘APCA_RETRY_WAIT’, 3)))
log.warn('websocket error, restarting connection: ’ +
str(wse))
raise ConnectionError(“max retries exceeded”) # Code added by htomxx

The recursive retry logic now works correctly. I hope this provides the insight you need to make a more official release of the API. I will be more then happy to adopt your official fix when it becomes available.

Thanks,
htomxx

Hey @htomxx – Thank you for catching this and making an adjustment!

We’ve passed this on to our Python SDK team to take a look.

Hi Mallory,

I am keeping statistics on the number of reconnects on my process. From market open my software is averaging 1 reconnect every 2 minutes. Please relay this to your API team, as I am sure this is not a desired retry interval. I am subscribing to 2300 symbols, and the API is working fine with the exception of the retry frequency.

The great thing is I don’t need to re-subscribe before I reconnect. The software automatically handles it.

Thanks,
htomxx