How can I run pipeline_live with USEquityPricing data?

I use the import statement: from pipeline_live.data.alpaca.pricing import USEquityPricing. I attach the pipeline using

attach_pipeline(pipe, 'Stocks').

However, when I run a scheduled function with the line:

pipe_output = pipeline_output('Stocks'),

I get the following error:

AttributeError: type object 'USEquityPricing' has no attribute 'get_loader'[2019-06-24 19:26:01.238885] WARNING: Executor: Continuing execution

I have run my algo with iex.pricing and polygon.pricing, yet get the same error. Any help is much appreciated, thanks!

Full Error: [2019-06-24 19:26:01.021099] ERROR: Executor: type object 'USEquityPricing' has no attribute 'get_loader'Traceback (most recent call last): File "/usr/local/lib/python3.6/dist-packages/pylivetrader/executor/executor.py", line 67, in wrapper func(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/pylivetrader/executor/executor.py", line 88, in every_bar handle_data(algo, current_data, dt_to_use) File "/usr/local/lib/python3.6/dist-packages/pylivetrader/misc/events.py", line 218, in handle_data dt, File "/usr/local/lib/python3.6/dist-packages/pylivetrader/misc/events.py", line 237, in handle_data self.callback(context, data) File "HVLv1.py", line 86, in late_day_trade pipe_output = pipeline_output('Stocks') File "/usr/local/lib/python3.6/dist-packages/pylivetrader/misc/api_context.py", line 62, in wrapped return getattr(algorithm, f.__name__)(*args, **kwargs) File "/usr/local/lib/python3.6/dist-packages/pylivetrader/algorithm.py", line 1077, in pipeline_output output = eng.run_pipeline(self._pipelines[name]) File "/usr/local/lib/python3.6/dist-packages/pipeline_live/engine.py", line 77, in run_pipeline initial_workspace, File "/usr/local/lib/python3.6/dist-packages/pipeline_live/engine.py", line 216, in compute_chunk loader_groups = groupby(loader_group_key, graph.loadable_terms) File "/usr/local/lib/python3.6/dist-packages/toolz/itertoolz.py", line 93, in groupby d[key(item)](item) File "/usr/local/lib/python3.6/dist-packages/toolz/functoolz.py", line 596, in __call__ return tuple(func(*args, **kwargs) for func in self.funcs) File "/usr/local/lib/python3.6/dist-packages/toolz/functoolz.py", line 596, in <genexpr> return tuple(func(*args, **kwargs) for func in self.funcs) File "/usr/local/lib/python3.6/dist-packages/pipeline_live/engine.py", line 214, in <lambda> lambda x: x.dataset.get_loader(), getitem(AttributeError: type object 'USEquityPricing' has no attribute 'get_loader'[2019-06-24 19:26:01.238885] WARNING: Executor: Continuing execution

Did you try with alpaca.pricing?

from pipeline_live.data.alpaca.pricing import USEquityPricing

Yes, I have tried:
from pipeline_live.data.alpaca.pricing import USEquityPricing.

I have also tried using the iex.pricing. The attribute get_loader error appears to be coming from the

pipeline_output() function. I am not sure where else to look.

Could the problem be that I am attempting to use iex? – I could post my code if needed.

from pipeline_live.data.iex.factors import ( AnnualizedVolatility, RSI )

1 Like

Ah yes, now that IEX needs API key, you should get your IEX API key and use it.

Thanks, I will try that. I also think I have a APIv1 account. Do you think upgrading to a APIv2 account would have any benefit in this case?

For this particularly, Alpaca API version doesn’t matter. That said, I would highly recommend to move to new account type and v2 API.

Hello @hitoshi

I am experiencing the same issue as above. It only happens when I use Alpaca API’s USEquityPricing. The algorithm I am using needs a lot of data so IEX isn’t cost-effective to use.

How can I fix this problem?

Are you using the latest version of pipeline-live? I’m trying to understand the error but the stack trace above doesn’t help much…

@hitoshi

I am, and here’s part of the code I’m trying to run:
from pipeline_live.data.alpaca.pricing import USEquityPricing
price = USEquityPricing.close.latest
ShortAvg = SimpleMovingAverage( inputs=[USEquityPricing.close], window_length=3, mask=base_universe )
LongAvg = SimpleMovingAverage( inputs=[USEquityPricing.close], window_length=45, mask=base_universe )

Maybe it has something to do with SimpleMovingAverage?

Here is the start to an example of what I am trying to run. The last line in the snippet is what throws the error: AttributeError: type object 'USEquityPricing' has no attribute 'get_loader'

So, the problem looks to be with the pipe_output = pipeline_output('Stocks'). I could post the file if that would help in terms of readability, but here it is.

 from pylivetrader.api import (
        attach_pipeline,
        date_rules,
        time_rules,
        order,
        order_target_percent,
        get_open_orders,
        cancel_order,
        pipeline_output,
        schedule_function,
    )

    import numpy as np
    import pandas as pd
    #API imports for pipeline
    from pylivetrader.finance.execution import LimitOrder
    from zipline.pipeline import Pipeline
    from pipeline_live.data.alpaca.pricing import USEquityPricing
    from pipeline_live.data.polygon.filters import (
        IsPrimaryShareEmulation as IsPrimaryShare)
    from pipeline_live.data.alpaca.factors import (
        AnnualizedVolatility, RSI
    )

    import logbook
    log = logbook.Logger('algo')

    def record(*args, **kwargs):
        print('args={}, kwargs={}'.format(args, kwargs))

    def initialize (context): # runs once when script starts
        log.info("Welcome")

        #Variables
        context.idr_losers = pd.Series(([]))
        context.day_count = 0
        context.daily_message = "Day {}."
        context.open_orders = get_open_orders()
        context.backup_stocks = symbols('BAM')

        #Factor criteria
        close_price = USEquityPricing.close.latest
        vol = USEquityPricing.volume.latest
        ann_var = AnnualizedVolatility()
        rsi = RSI()
        
        #screening
        mask_custom = (IsPrimaryShare() & (vol > 1000000) & (close_price < 20) & (ann_var > 1) & (rsi < 30))
        stockBasket = USEquityPricing.close.latest.top(3000,  mask = mask_custom)
        
        #Column construction
        pipe_columns = {'close_price': close_price, 'volume': vol, 'ann_var': ann_var}
        
        #Creation of actual pipeline
        pipe = Pipeline(columns = pipe_columns, screen = stockBasket)
        attach_pipeline(pipe, 'Stocks')
        log.info(USEquityPricing.get_loader())

        #Schedule functions
        schedule_function(late_day_trade, date_rules.every_day(), time_rules.market_open(hours = 3, minutes = 0)) #offset open tells when to run a user defined function
        schedule_function(check_portfolio, date_rules.every_day(), time_rules.market_open(hours = 0, minutes = 1))
        schedule_function(morning_day_trade1, date_rules.every_day(), time_rules.market_open(hours = 0, minutes = 45))
        schedule_function(morning_day_trade2, date_rules.every_day(), time_rules.market_open(hours = 0, minutes = 55))
        schedule_function(check_portfolio, date_rules.every_day(), time_rules.market_open(hours = 0, minutes = 56))
                    
    def late_day_trade(context, data):
     #Get the pipeline output from attach pipeline
        pipe_output = pipeline_output('Stocks')

I think it may be. The only factor I have successfully gotten to run thus far is AverageDollarVolume. I imported it with from pipeline_live.data.alpaca.factors import AverageDollarVolume.

I realize I think you need this line if you don’t have it: from pipeline_live.data.alpaca.factors import SimpleMovingAverage

Also, after looking at the pipeline_live documentation I realize that all factors in https://www.zipline.io/appendix.html can be used. Quantopian has some other factors that aren’t shared with zipline. For example, AnnualizedVolatility only existed on Quantopian, but not zipline. Will see if I can solve the issue through a custom factor.

Hmm interesting, I’ll implement it and see if it works. Been switching over to Debian to run the code more efficiently. Thanks for the suggestion @vfp1587!

1 Like

The issue ended up being with the zipline factors I was using. I solved my problem by importing a custom factor for Annualized Volatility. The custom factor meant just copying the source code from zipline. Some of the source code factors like annualized volatility aren’t included in the Built-in-Factors. I don’t know why. It looked like this:

#API imports for pipeline
from pylivetrader.finance.execution import LimitOrder
from zipline.pipeline import Pipeline, CustomFactor
from pipeline_live.data.alpaca.pricing import USEquityPricing
#from pipeline_live.data.iex.pricing import USEquityPricing
from pipeline_live.data.polygon.filters import (IsPrimaryShareEmulation as IsPrimaryShare)
from pipeline_live.data.alpaca.factors import RSI, Returns

class AnnualizedVolatility(CustomFactor):
    inputs = [Returns(window_length=2)]
    params = {'annualization_factor': 252.0}
    window_length = 252
    def compute(self, today, assets, out, returns, annualization_factor):
        out[:] = np.nanstd(returns, axis=0) * (annualization_factor ** .5)