UPDATE for those who may be facing the same problem as me.
Based on page 43 of the UTP Specifications there are specific trade conditions that need to be excluded for accurate high, low, open, close, and volume calculations. This is critical for those building their own bars from trade data.
def aggregate_to_5min_bars(df, price_column='price', size_column='size', conditions_column='conditions'):
# Define conditions to exclude for high/low, open/close, and volume
excluded_conditions_high_low = {'C', 'H', 'I', 'M', 'N', 'Q', 'R', 'T', 'U', 'V', 'W', '7'}
excluded_conditions_volume = {'M', 'Q', '9'}
excluded_conditions_open_close = {'C', 'G', 'H', 'I', 'M', 'N', 'P', 'Q', 'R', 'T', 'U', 'V', 'W', 'Z', '4', '7'}
# Ensure the DataFrame index, which is the timestamp, is sorted
df.sort_index(inplace=True)
# Filtering logic: Exclude trades for high and low calculations if any condition matches the exclusion list
df_high_low = df[~df[conditions_column].apply(lambda x: any(cond in excluded_conditions_high_low for cond in x))]
# Exclude trades for volume calculations if any condition matches the exclusion list
df_volume = df[~df[conditions_column].apply(lambda x: any(cond in excluded_conditions_volume for cond in x))]
# Exclude trades for open and close calculations if any condition matches the exclusion list
df_open_close = df[~df[conditions_column].apply(lambda x: any(cond in excluded_conditions_open_close for cond in x))]
# Resample to 5-minute intervals for OHLC calculations
ohlc = df_open_close[price_column].resample('5T').ohlc()
ohlc['high'] = df_high_low[price_column].resample('5T').max()
ohlc['low'] = df_high_low[price_column].resample('5T').min()
# Open and Close prices use df_open_close which filters out specified conditions
ohlc['open'] = df_open_close[price_column].resample('5T').first()
ohlc['close'] = df_open_close[price_column].resample('5T').last()
# Volume calculation includes all trades except those with excluded conditions
ohlc['volume'] = df_volume[size_column].resample('5T').sum()
# Calculate VWAP for each bar using all trades
vwap = (df[price_column] * df[size_column]).resample('5T').sum() / ohlc['volume']
ohlc['VWAP'] = vwap.replace([np.inf, -np.inf], np.nan) # Replace infinite values with NaN
ohlc['trade_count'] = df.resample('5T').count()[price_column]
return ohlc
start_str = '2024-02-14T18:00:00-00:00'
end_str = '2024-02-14T19:00:00-00:00'
# Fetch trades
data_trades = api.get_trades(
'AAPL',
start=start_str,
end=end_str,
limit=None,
feed='sip'
).df
data_trades.head()
df_5m = aggregate_to_5min_bars(data_trades)
df_5m
After comparing aggregated data with Alpaca’s bars data, we can see that aggregated and downloaded bars are identical
start_str = '2024-02-14T18:00:00-00:00'
end_str = '2024-02-14T18:55:00-00:00'
data_bars = api.get_bars(
'AAPL',
TimeFrame(5, TimeFrameUnit.Minute),
start=start_str,
end=end_str,
adjustment='raw',
limit=None,
feed='sip'
).df
data_bars[[
'open',
'high',
'low',
'close',
'volume',
'vwap',
'trade_count'
]]