Restarting Bar Streaming Results in Error

I am developing in Jupyter Notebook and wrote the following code to start a streaming client asynchronously and write to an existing dataframe.

I perform a data pull of the latest historical data and concatenate to it with results from the streaming client.

My issue is calling the following method results in a Timeout Error: stock_data_stream_client.stop()

Furthermore, if I try to restart the streaming client via stock_data_stream_client.run(), I get a connection limit exceeded which refers to the fact that we can only 1 have streaming client at a time. Anyone know why I am receiving these errors despite calling the “stop” method?

import asyncio
from collections import deque
import pandas as pd

stock_data_stream_client = StockDataStream(api_key, secret_key, url_override = stream_data_wss)

async def stock_data_stream_handler(data):
    global df
    # print(type(data))
    new_entry = pd.DataFrame([{
        "symbol": data.symbol,
        "open": data.open,
        "high": data.high,
        "low": data.low,
        "close": data.close,
        "volume": data.volume,
        "trade_count": data.trade_count,
        "vwap": data.vwap,
        "timestamp": data.timestamp
    }])

    new_entry = new_entry.reset_index().set_index(["symbol"]).add_suffix("_1min")
    new_entry["timestamp"] = new_entry["timestamp_1min"]
    new_entry = new_entry.reset_index().set_index("timestamp")
    print("Adding new entry.")
    df = pd.concat([df, new_entry], ignore_index=False)

  symbols = ["TQQQ"]
  
  # stock_data_stream_client.subscribe_quotes(stock_data_stream_handler, *symbols)
  # stock_data_stream_client.subscribe_trades(stock_data_stream_handler, *symbols)
  stock_data_stream_client.subscribe_bars(stock_data_stream_handler, *symbols)
  
  async def start_stream():
      await asyncio.sleep(1) # Allow other tasks to initiate
      stock_data_stream_client.run()
  
  task = asyncio.create_task(start_stream())
  
  print("Run other cells.")

This is when I run the stop function.

---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
Cell In[31], line 1
----> 1 stock_data_stream_client.stop()

File ~\AppData\Local\Programs\Python\Python311\Lib\site-packages\alpaca\data\live\websocket.py:376, in DataStream.stop(self)
    374 """Stops the websocket connection."""
    375 if self._loop.is_running():
--> 376     asyncio.run_coroutine_threadsafe(self.stop_ws(), self._loop).result(
    377         timeout=5
    378     )

File ~\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\_base.py:458, in Future.result(self, timeout)
    456             return self.__get_result()
    457         else:
--> 458             raise TimeoutError()
    459 finally:
    460     # Break a reference cycle with the exception in self._exception
    461     self = None

TimeoutError: