hello , might be impossible but worth a shot.
i have a trading alogorythem with very specific conditions.
i need to listen to multiple streams separately from each other .
i try to to that by opening a new process for each stream in my python . my code is as follows
def to_df(msg):
global levels , in_position ,api df =pd.DataFrame() a_json = json.loads(msg) candle_open = a_json['o'] candle_close = a_json['c'] candle_high = a_json['h'] candle_low = a_json['l'] candle_ts = a_json['t'] candle_exch = a_json['x'] print(f"open : {candle_open} , close : {candle_close} , high : {candle_high} , low : {candle_low} AT EXCHANGE : {candle_exch}") if candle_exch == "FTX": timestamps.append(candle_ts) opens.append(candle_open) closes.append(candle_close) highs.append(candle_high) lows.append(candle_low) df['Datetime'] = timestamps df['Open'] = opens df['High'] = highs df['Low'] = lows df['Close'] = closes if len(df) > 31: df["roc_thin"] = talib.ROCP(df['Close'], timeperiod = 5 ) df["roc_sma_5"] = talib.SMA(df['roc_thin'], timeperiod = 5) df["roc_sma_30"] = talib.SMA(df['roc_thin'], timeperiod = 30) df["roc_roc_5"] = talib.ROCP(df['roc_sma_5'], timeperiod = 1) df['rsi'] = talib.RSI(df['Close'], timeperiod=14) print(df) if len(api.list_positions()) == 0 and len(api.list_orders()) == 0: if df["roc_sma_5"][-1] > df["roc_sma_15"][-1]: if df['rsi'][-1] < 30: candle_df = get_pattern_df(df) if '_Bull' in df['candlestick_pattern'][-1]: best_candle_rating=candle_rankings.get(df['candlestick_pattern'][-1],100) candle_rating = df['pattern_val'][-1] """ if candle_rating > 7 and best_candle_rating < 60: return True elif candle_rating > 5 and best_candle_rating < 40: return True """ if candle_rating > 3 and best_candle_rating < 20: stock_amnt = stock_amnt_order(closes[-1]) api.submit_order(symbol=symbol,qty=stock_amnt,side='buy',type='market',time_in_force='gtc') elif candle_rating > 6 and best_candle_rating < 40: stock_amnt = stock_amnt_order(closes[-1]) api.submit_order(symbol=symbol,qty=stock_amnt,side='buy',type='market',time_in_force='gtc') elif candle_rating > 7 and best_candle_rating < 60: stock_amnt = stock_amnt_order(closes[-1]) api.submit_order(symbol=symbol,qty=stock_amnt,side='buy',type='market',time_in_force='gtc') elif len(api.list_positions()) >= 1: roc_5 = df['roc_sma_5'][-1] roc_15 = df['roc_sma_15'][-1] roc_roc_5 = df["roc_roc_5"][-1] if roc_5 <= roc_15: stock_amnt = api.list_positions()[1].qty api.submit_order(symbol=symbol,qty=stock_amnt,side='sell',type='market',time_in_force='gtc',order_class='bracket') if roc_roc_5 < 0: stock_amnt = api.list_positions()[1].qty api.submit_order(symbol=symbol,qty=stock_amnt,side='sell',type='market',time_in_force='gtc',order_class='bracket')
def get_pattern_df(df):
def stock_amnt_order(close):
global api account = api.get_account() balance = account.buying_power amount = int(balance / close) -1 return amount
def on_open(ws):
print("opened") global symbol try: auth_data ={"action": "auth", "key": "my_key", "secret": "my_secret"} ws.send(json.dumps(auth_data)) except : print("already authenticated") listen_message = {"action":"subscribe","bars":[symbol]} ws.send(json.dumps(listen_message))
def on_message(ws, message):
global api message = message[1:-1] print(message) to_df(message)
def on_close(ws,var1,var2):
print("closed connection")
def set_symbol(sym):
global symbol symbol = sym
def get_symbol():
global symbol return symbol
def run_bot_inst(symbol):
global opens,timestamps,closes,highs,lows,levels,api opens = [] timestamps =[] closes = [] highs = [] lows=[] levels = [] api = tradeapi.REST(key_id = API_ID,secret_key = API_KEY,base_url = api_endpoint) symbol = symbol.strip('\n') set_symbol(symbol) socket = "wss://stream.data.alpaca.markets/v2/iex" ws = websocket.WebSocketApp(socket,on_open=on_open,on_message=on_message,on_close=on_close) ws.run_forever()
#file_path = ‘/input/’+symbols_file
file_path = ‘C:\Users\nolys\Desktop\results\symbols.txt’
Sym_file = open(file_path,“r”)
if name == ‘main’:
# start n worker processes with multiprocessing.Pool(processes=16) as pool: pool.map_async(run_bot_inst,iterable=Sym_file).get()
the problem is the the first process authenticates and gets data just fine ,the others fail as I am already authenticated.
to my question :
is there a way to make so that only one process authenticates for all the process , if not , is anybody familiar with a way to listen to streams on separate processes ?
any otherway around it would be highly appreciated as well