Skip to content

Commit

Permalink
#535 refactoring pipeline batching implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolay-r committed Jan 18, 2024
1 parent 1eff4fe commit af1a351
Showing 1 changed file with 6 additions and 14 deletions.
20 changes: 6 additions & 14 deletions arekit/common/pipeline/batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,15 @@ def run(pipeline, pipeline_ctx, src_key=None):
assert (isinstance(item, BasePipelineItem))

# Handle the content of the batch or batch itself.
content = item.get_source(pipeline_ctx, call_func=item.SupportBatching,
force_key=src_key if ind == 0 else None)
content = item.get_source(pipeline_ctx, call_func=False, force_key=src_key if ind == 0 else None)
handled_batch = [item._src_func(i) if item._src_func is not None else i for i in content]

if item.SupportBatching:
handled_batch = content
batch_result = list(item.apply(input_data=handled_batch, pipeline_ctx=pipeline_ctx))
else:
handled_batch = [item._src_func(i) if item._src_func is not None else i for i in content]

# At present, each batch represent a list of contents.
assert(isinstance(handled_batch, list))

batch_result = []
input_data_iter = [handled_batch] if item.SupportBatching else handled_batch
for input_data in input_data_iter:
item_result = item.apply(input_data=input_data, pipeline_ctx=pipeline_ctx)
batch_result.append(item_result)
batch_result = [item.apply(input_data=input_data, pipeline_ctx=pipeline_ctx)
for input_data in handled_batch]

pipeline_ctx.update(param=item.ResultKey, value=batch_result, is_new_key=False)

return pipeline_ctx
return pipeline_ctx

0 comments on commit af1a351

Please sign in to comment.