Hello, copied this pairs trading example Pairs Trading - Strategy Deployment Guide and changed websockets from T to Minute streaming. Script seems to be working the first loop, but then in 1 minute when data arrives from Websocket it does not pass it to while not trade_taken. Please help me to understand how to pass data to while not trade_taken:
import threading
import websocket
import websockets
from time import sleepimport alpaca_trade_api as tradeapi
import pandas as pdbase_url = ‘https://paper-api.alpaca.markets’
data_url = ‘wss://data.alpaca.markets’
trade_taken = False
count = 0
count1 = 0instantiate REST API
api = tradeapi.REST(‘PK5XL4LYUFJNDGXU8BNW’,
‘CvIaAkLxB2AcXZRwbsgV6hXy3grTLLRjVo6SHYMx’, base_url=base_url, api_version=‘v2’)init WebSocket
conn = tradeapi.stream2.StreamConn(
‘PK5XL4LYUFJNDGXU8BNW’, ‘CvIaAkLxB2AcXZRwbsgV6hXy3grTLLRjVo6SHYMx’, data_url=data_url, data_stream=‘alpacadatav1’
)def wait_for_market_open():
clock = api.get_clock()
if not clock.is_open:
time_to_open = clock.next_open - clock.timestamp
sleep_time = round(time_to_open.total_seconds())
print(“Sleep time:”, sleep_time)
sleep(sleep_time)
return clockdefine websocket callbacks
data_df = None
@conn.on(r’^AM.V$')
async def on_second_bars_V(conn, channel, bar):
print('Visa: ', bar.close)
if data_df is not None:
data_df.v[-1] = bar.close
print("DatadfV: ", data_df.v[-1])@conn.on(r’^AM.MA$')
async def on_second_bars_MA(conn, channel, bar):
print('MA: ', bar.close)
if data_df is not None:
data_df.ma[-1] = bar.close
print("DatadfMA: ", data_df.ma[-1])start WebSocket in a thread
streams = [‘AM.V’, ‘AM.MA’]
ws_thread = threading.Thread(target=conn.run, daemon=True, args=(streams,))
ws_thread.start()main script executes forever
while True:
count1 += 1
print("Main Count1: ", count1)
clock = wait_for_market_open()
ma = api.get_barset(‘MA’, ‘1Min’, limit=26)
v = api.get_barset(‘V’, ‘1Min’, limit=26)
# print(ma)
# print(v)
data_df = pd.concat(
[ma.df.MA.close, v.df.V.close],
axis=1,
join=‘inner’,
keys=[‘ma’, ‘v’],
)
# print(“DataDF”, data_df)
data_df.v[-1] = 0
data_df.ma[-1] = 0
# print(data_df.v)spread_df = data_df.pct_change() # print(spread_df) spread_df = spread_df[:-1] spread_df['spread'] = spread_df.ma - spread_df.v max_divergence = spread_df.spread.tail(20).abs().max() # print("Max Divergence:", max_divergence) # Check if websockets are streaming while data_df.v[-1] == 0 or data_df.ma[-1] == 0: print("Data is not coming in?", data_df.v[-1]) sleep(1) # Not in trade. Looking for arbitrage diviations while not trade_taken: # check for trade opportunities spread_df = data_df.pct_change()[:2] print("Spread_df: ", spread_df) # count += 1 # print(count) # print("Spr ", spread_df) # print(".................") spread_df['spread'] = spread_df.ma - spread_df.v print("Max Divergence1:", max_divergence) print("abs(spread_df.spread[-1]: ", abs(spread_df.spread[-1])) print("abs(spread_df.spread[0]: ", abs(spread_df.spread[0])) # print("abs(spread_df.spread[1]: ", abs(spread_df.spread[1])) if abs(spread_df.spread[-1]) > max_divergence: print("Inside the trade") # there is a trade opportunuty - calculate position sizing acct = api.get_account() print("acct", acct) acct_size = float(acct.equity) print("Acc Size:", acct_size) ma_size = round(data_df.v[-1]) v_size = round(data_df.ma[-1]) if spread_df.spread[-1] < 0: # ma - v is negative -> Long ma short v long_ma = True ma_side = 'buy' v_side = 'sell' else: # ma - v is positive -> Short ma long v long_ma = False ma_side = 'sell' v_side = 'buy' # submit order api.submit_order( symbol='MA', qty=ma_size, side=ma_side, time_in_force='gtc', type='market', ) api.submit_order( symbol='V', qty=v_size, side=v_side, time_in_force='gtc', type='market', ) print("trade taken") trade_taken = True stop_loss = (max_divergence + 10) * -1 print("StopLoss:", stop_loss) take_profit = max_divergence + 3 print("TakeProfit:", take_profit) break sleep(1) # # check if the market is still open if pd.Timestamp.now(tz='America/Los_Angeles') > clock.next_close: trade_taken = False break # In a trade - check for exit while trade_taken: print("In trade") pnl = data_df.ma[-1] * ma_size - data_df.v[-1] * v_size print("PNL:", pnl) if not long_ma: pnl *= -1 # inverse the p&l calculation if pnl < stop_loss or pnl > take_profit: # Either stop or take profit hit - close trade api.close_position('MA') api.close_position('V') trade_taken = False break if pd.Timestamp.now(tz='America/Los_Angeles') > clock.next_close: break