Skip to content

Commit

Permalink
#546 support
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolay-r committed Jan 8, 2024
1 parent 479e8dd commit cf13a66
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions arekit/common/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
class BasePipelineLauncher:

@staticmethod
def run(pipeline, pipeline_ctx, src_key=None):
def run(pipeline, pipeline_ctx, src_key=None, has_input=True):
assert(isinstance(pipeline, list))
assert(isinstance(pipeline_ctx, PipelineContext))
assert(isinstance(src_key, str) or src_key is None)

for ind, item in enumerate(filter(lambda itm: itm is not None, pipeline)):
assert(isinstance(item, BasePipelineItem))
input_data = item.get_source(pipeline_ctx, force_key=src_key if src_key is not None and ind == 0 else None)
do_force_key = src_key is not None and ind == 0
input_data = item.get_source(pipeline_ctx, force_key=src_key if do_force_key else None) \
if has_input and ind == 0 else None
item_result = item.apply(input_data=input_data, pipeline_ctx=pipeline_ctx)
pipeline_ctx.update(param=item.ResultKey, value=item_result, is_new_key=False)

Expand Down

0 comments on commit cf13a66

Please sign in to comment.