Alpaca/Polygon Streams -> monitor, divert, filter

Python 3, Alpaca api v.044, polygon/alpaca async stream data best practices.

Please note: active alpaca trading account with polygon data streams for example code below.

Streams provide the opportunity to filter and process data in near real-time, however, however accurately monitoring quote and price information graphically has proven a challenge.

What are some Alpaca strategies to “divert” streams, while a graphics, filtering, or processing operation is being performed? What are some recommended procedures to extract samples of stream data to perform analysis or QC, without disrupting the incoming flow of information? Do you recommend using parallel processing procedures?

Please provide some sample code examples for:

  • Graphically monitoring stream data, while preserving the incoming information.
  • Pausing the stream data, or spawning a filtering or processing operation.
  • Async function examples for performing operations, besides edits/restarts.

Here are some simple, example python-3 code to try an monitor some alpaca/polygon quotes data graphically, using matplotlib. The goal is to accurately sample, display and monitor selected stream data, without disrupting the input flow. Please provide better and simpler examples.

Example 1 Tries to accumulate mean bid and ask price data from async stream, while graphically showing sample values.

import os
import time
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import style
import alpaca_trade_api as tradeapi

style.use(‘seaborn-darkgrid’)

symbol = ‘TQQQ’
nz = 2000

opts = {}
opts[‘key_id’] = os.environ[‘APCA_API_KEY_ID’]
opts[‘secret_key’] = os.environ[‘APCA_API_SECRET_KEY’]
opts[‘base_url’] = os.environ[‘APCA_API_BASE_URL’]

api = tradeapi.REST(**opts)
conn = tradeapi.stream2.StreamConn(**opts)
qc = ‘Q.%s’ % symbol

class Quote ():

def __init__(self):
   self.count = -1
   self.half = 0.50
   self.last = 0.0
   self.freeze = 0
   self.y  = np.zeros(1000000)
   self.x1 = np.zeros(nz)
   self.x1 = np.linspace( 1, nz, nz )
   self.y1 = np.zeros(nz)

def update ( self, data ):
    mean = ( data.bidprice + data.askprice ) * self.half
    if ( self.count == -1 ):
       self.last = self.y[self.count]
    if ( mean != self.last ):
       self.count += 1
       self.y[self.count] = mean
       print ( self.count, self.y[self.count] )
       if ( self.count >= nz ):
          kount = -1
          for i in range (self.count-nz, self.count):
             kount += 1
             self.y1[kount] = self.y[i]
       self.last = mean

quote = Quote()
plt.ion()

@conn.on(r’Q$’)
async def on_quote(conn, channel, data):
quote.update (data)
if ( quote.count >= nz and quote.freeze == 0 ):
quote.freeze = 1
plt.gca().cla()
plt.plot ( quote.x1, quote.y1 )
plt.draw()
plt.pause(0.001)
quote.freeze = 0

conn.run([qc])

Example 2: Tries to “roll” values in a small memory array, however the plotting disrupts the input data stream.

import os
import time
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import style
import alpaca_trade_api as tradeapi

style.use(‘seaborn-darkgrid’)

symbol = ‘TQQQ’
nz = 500

opts = {}
opts[‘key_id’] = os.environ[‘APCA_API_KEY_ID’]
opts[‘secret_key’] = os.environ[‘APCA_API_SECRET_KEY’]
opts[‘base_url’] = os.environ[‘APCA_API_BASE_URL’]

api = tradeapi.REST(**opts)
conn = tradeapi.stream2.StreamConn(**opts)
qc = ‘Q.%s’ % symbol

class Quote ():

def __init__(self):
   self.y = 0
   self.count = -1
   self.xs  = np.zeros(nz)
   self.ys  = np.zeros(nz)
   self.xs  = np.linspace ( 1, nz, nz )

def update ( self, data ):
    self.count += 1
    self.y = ( data.bidprice + data.askprice ) * 0.50
    if ( self.count < nz ): 
        self.ys[self.count] = (float(self.y))
    else:
        self.count = nz
        self.ys  = np.roll ( self.ys,  -1 )
        self.ys[nz-1]  = (float(self.y))

quote = Quote()
plt.ion()

@conn.on(r’Q$’)
async def on_quote(conn, channel, data):
quote.update (data)
print ( quote.count, quote.y)
if ( quote.count == nz ):
plt.gca().cla()
plt.plot ( quote.xs, quote.ys )
plt.draw()
plt.pause(0.001)

time.sleep(0.01)

conn.run([qc])

Ideally, I would like to graphically monitor an accurate filtering or processing operation on the Trade async data stream, similar to the image below:

Example code from Momentum-Trading-Example (algo.py) and example-hftish (tick_taker.py) provided valuable background information. Thanks very much to Hitoshi Harada and other gifted, Alpaca team members.

Thanks in advance for your continued support. Sorry to disturb you with this request.

Regards,

Joe O.

Not sure how to post example code - here is screen dump of formatting from example 1 of previous post.