Websockets don't pass data to while loop

Hello, copied this pairs trading example Pairs Trading - Strategy Deployment Guide and changed websockets from T to Minute streaming. Script seems to be working the first loop, but then in 1 minute when data arrives from Websocket it does not pass it to while not trade_taken. Please help me to understand how to pass data to while not trade_taken:

import threading

import websocket
import websockets
from time import sleep

import alpaca_trade_api as tradeapi
import pandas as pd

base_url = ‘https://paper-api.alpaca.markets
data_url = ‘wss://data.alpaca.markets’
trade_taken = False
count = 0
count1 = 0

instantiate REST API

api = tradeapi.REST(‘PK5XL4LYUFJNDGXU8BNW’,
‘CvIaAkLxB2AcXZRwbsgV6hXy3grTLLRjVo6SHYMx’, base_url=base_url, api_version=‘v2’)

init WebSocket

conn = tradeapi.stream2.StreamConn(
‘PK5XL4LYUFJNDGXU8BNW’, ‘CvIaAkLxB2AcXZRwbsgV6hXy3grTLLRjVo6SHYMx’, data_url=data_url, data_stream=‘alpacadatav1’
)

def wait_for_market_open():
clock = api.get_clock()
if not clock.is_open:
time_to_open = clock.next_open - clock.timestamp
sleep_time = round(time_to_open.total_seconds())
print(“Sleep time:”, sleep_time)
sleep(sleep_time)
return clock

define websocket callbacks

data_df = None

@conn.on(r’^AM.V$')
async def on_second_bars_V(conn, channel, bar):
print('Visa: ', bar.close)
if data_df is not None:
data_df.v[-1] = bar.close
print("DatadfV: ", data_df.v[-1])

@conn.on(r’^AM.MA$')
async def on_second_bars_MA(conn, channel, bar):
print('MA: ', bar.close)
if data_df is not None:
data_df.ma[-1] = bar.close
print("DatadfMA: ", data_df.ma[-1])

start WebSocket in a thread

streams = [‘AM.V’, ‘AM.MA’]
ws_thread = threading.Thread(target=conn.run, daemon=True, args=(streams,))
ws_thread.start()

main script executes forever

while True:
count1 += 1
print("Main Count1: ", count1)
clock = wait_for_market_open()
ma = api.get_barset(‘MA’, ‘1Min’, limit=26)
v = api.get_barset(‘V’, ‘1Min’, limit=26)
# print(ma)
# print(v)
data_df = pd.concat(
[ma.df.MA.close, v.df.V.close],
axis=1,
join=‘inner’,
keys=[‘ma’, ‘v’],
)
# print(“DataDF”, data_df)
data_df.v[-1] = 0
data_df.ma[-1] = 0
# print(data_df.v)

spread_df = data_df.pct_change()
# print(spread_df)
spread_df = spread_df[:-1]
spread_df['spread'] = spread_df.ma - spread_df.v
max_divergence = spread_df.spread.tail(20).abs().max()
# print("Max Divergence:", max_divergence)

# Check if websockets are streaming
while data_df.v[-1] == 0 or data_df.ma[-1] == 0:
    print("Data is not coming in?", data_df.v[-1])
    sleep(1)

# Not in trade. Looking for arbitrage diviations
while not trade_taken:
    # check for trade opportunities
    spread_df = data_df.pct_change()[:2]
    print("Spread_df: ", spread_df)
    # count += 1
    # print(count)
    # print("Spr ", spread_df)
    # print(".................")
    spread_df['spread'] = spread_df.ma - spread_df.v
    print("Max Divergence1:", max_divergence)
    print("abs(spread_df.spread[-1]: ", abs(spread_df.spread[-1]))
    print("abs(spread_df.spread[0]: ", abs(spread_df.spread[0]))
    # print("abs(spread_df.spread[1]: ", abs(spread_df.spread[1]))
    if abs(spread_df.spread[-1]) > max_divergence:
        print("Inside the trade")
        # there is a trade opportunuty - calculate position sizing
        acct = api.get_account()
        print("acct", acct)
        acct_size = float(acct.equity)
        print("Acc Size:", acct_size)
        ma_size = round(data_df.v[-1])
        v_size = round(data_df.ma[-1])

        if spread_df.spread[-1] < 0:
            # ma - v is negative -> Long ma short v
            long_ma = True
            ma_side = 'buy'
            v_side = 'sell'

        else:
            # ma - v is positive -> Short ma long v
            long_ma = False
            ma_side = 'sell'
            v_side = 'buy'

        # submit order
        api.submit_order(
            symbol='MA',
            qty=ma_size,
            side=ma_side,
            time_in_force='gtc',
            type='market',
        )

        api.submit_order(
            symbol='V',
            qty=v_size,
            side=v_side,
            time_in_force='gtc',
            type='market',
        )
        print("trade taken")
        trade_taken = True
        stop_loss = (max_divergence + 10) * -1
        print("StopLoss:", stop_loss)
        take_profit = max_divergence + 3
        print("TakeProfit:", take_profit)
        break

    sleep(1)

    # # check if the market is still open
    if pd.Timestamp.now(tz='America/Los_Angeles') > clock.next_close:
        trade_taken = False
        break

# In a trade - check for exit
while trade_taken:
    print("In trade")
    pnl = data_df.ma[-1] * ma_size - data_df.v[-1] * v_size
    print("PNL:", pnl)
    if not long_ma:
        pnl *= -1  # inverse the p&l calculation

    if pnl < stop_loss or pnl > take_profit:
        # Either stop or take profit hit - close trade
        api.close_position('MA')
        api.close_position('V')
        trade_taken = False
        break

    if pd.Timestamp.now(tz='America/Los_Angeles') > clock.next_close:
        break

Personally I find this example to to unnecessarily complicated. Try using the technique put forth by parttimelarry at this YouTube video: Part Time Larry (Web Sockets on Alpaca)

Part Time Larry uses a much simpler structure where you set up a simple listener that receives the stream as a message. What I did notice though, is that when the message comes in you need to assign it to another variable in order to work with the data because the socket reserves the message variable for additional incoming messages.

Thank you. Will check!

I have tried using the code Part Time Larry demonstrated. Unfortunately, I can’t get the stream.

I am able to run both v1 and v2 through the Command Line without a problem. However, my PyCharm IDE won’t stream.

Here is the code I am using from Stream.py:

import config
import websocket
import json

def on_open(ws):
print(“opened”)
auth_data = {
“action”: “authenticate”,
“data”: {“action”: “auth”, “key”: “", “secret”: "**”}
}

ws.send(json.dumps(auth_data))

listen_message = {"action": "subscribe", "trades": ["AAPL"], "quotes": ["AMD", "CLDR"], "bars": ["*"]}

ws.send(json.dumps(listen_message))

def on_message(ws,message):
print(“received a message”)
print(message)

def on_close(ws):
print(“closed connection”)

socket = “wss://stream.data.alpaca.markets/v2/iex”

ws = websocket.WebSocketApp(socket, on_open=on_open, on_message=on_message, on_close=on_close)

ws.run_forever()

I receive the following message in the terminal:

“Process finished with exit code 0”

But no stream. Should I be looking somewhere else for the stream or should it pop up in the terminal? I have the same issue with v1 and v2.
"