I am developing in Jupyter Notebook and wrote the following code to start a streaming client asynchronously and write to an existing dataframe.
I perform a data pull of the latest historical data and concatenate to it with results from the streaming client.
My issue is calling the following method results in a Timeout Error: stock_data_stream_client.stop()
Furthermore, if I try to restart the streaming client via stock_data_stream_client.run(), I get a connection limit exceeded which refers to the fact that we can only 1 have streaming client at a time. Anyone know why I am receiving these errors despite calling the “stop” method?
import asyncio
from collections import deque
import pandas as pd
stock_data_stream_client = StockDataStream(api_key, secret_key, url_override = stream_data_wss)
async def stock_data_stream_handler(data):
global df
# print(type(data))
new_entry = pd.DataFrame([{
"symbol": data.symbol,
"open": data.open,
"high": data.high,
"low": data.low,
"close": data.close,
"volume": data.volume,
"trade_count": data.trade_count,
"vwap": data.vwap,
"timestamp": data.timestamp
}])
new_entry = new_entry.reset_index().set_index(["symbol"]).add_suffix("_1min")
new_entry["timestamp"] = new_entry["timestamp_1min"]
new_entry = new_entry.reset_index().set_index("timestamp")
print("Adding new entry.")
df = pd.concat([df, new_entry], ignore_index=False)
symbols = ["TQQQ"]
# stock_data_stream_client.subscribe_quotes(stock_data_stream_handler, *symbols)
# stock_data_stream_client.subscribe_trades(stock_data_stream_handler, *symbols)
stock_data_stream_client.subscribe_bars(stock_data_stream_handler, *symbols)
async def start_stream():
await asyncio.sleep(1) # Allow other tasks to initiate
stock_data_stream_client.run()
task = asyncio.create_task(start_stream())
print("Run other cells.")
This is when I run the stop function.
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
Cell In[31], line 1
----> 1 stock_data_stream_client.stop()
File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\alpaca\data\live\websocket.py:376, in DataStream.stop(self)
374 """Stops the websocket connection."""
375 if self._loop.is_running():
--> 376 asyncio.run_coroutine_threadsafe(self.stop_ws(), self._loop).result(
377 timeout=5
378 )
File ~\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py:458, in Future.result(self, timeout)
456 return self.__get_result()
457 else:
--> 458 raise TimeoutError()
459 finally:
460 # Break a reference cycle with the exception in self._exception
461 self = None
TimeoutError: