Skip to content

Commit

Permalink
Fix bug in Pipeline.run() executing Components in a wrong and unexpec…
Browse files Browse the repository at this point in the history
…ted order (#8021)

* Fix bug in Pipeline.run() executing Components in a wrong and unexpected order

* Update haystack/core/pipeline/base.py

Co-authored-by: Madeesh Kannan <[email protected]>

---------

Co-authored-by: Madeesh Kannan <[email protected]>
  • Loading branch information
silvanocerza and shadeMe committed Jul 12, 2024
1 parent 94b8068 commit 0411cd9
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 9 deletions.
12 changes: 12 additions & 0 deletions haystack/core/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,18 @@ def _init_inputs_state(self, data: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[

def _init_run_queue(self, pipeline_inputs: Dict[str, Any]) -> List[Tuple[str, Component]]:
run_queue: List[Tuple[str, Component]] = []

# HACK: Quick workaround for the issue of execution order not being
# well-defined (NB - https://github.com/deepset-ai/haystack/issues/7985).
# We should fix the original execution logic instead.
if networkx.is_directed_acyclic_graph(self.graph):
# If the Pipeline is linear we can easily determine the order of execution with
# a topological sort.
# So use that to get the run order.
for node in networkx.topological_sort(self.graph):
run_queue.append((node, self.graph.nodes[node]["instance"]))
return run_queue

for node_name in self.graph.nodes:
component = self.graph.nodes[node_name]["instance"]

Expand Down
4 changes: 4 additions & 0 deletions releasenotes/notes/fix-execution-order-1121cedd9c68c560.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
Fix bug in Pipeline.run() executing Components in a wrong and unexpected order
1 change: 1 addition & 0 deletions test/core/pipeline/features/pipeline_run.feature
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Feature: Pipeline running
| that has a component with default inputs that doesn't receive anything from its sender |
| that has a component with default inputs that doesn't receive anything from its sender but receives input from user |
| that has a loop and a component with default inputs that doesn't receive anything from its sender but receives input from user |
| that has multiple components with only default inputs and are added in a different order from the order of execution |

Scenario Outline: Running a bad Pipeline
Given a pipeline <kind>
Expand Down
114 changes: 110 additions & 4 deletions test/core/pipeline/features/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def pipeline_that_has_three_branches_that_dont_merge():
PipelineRunData(
inputs={"add_one": {"value": 1}},
expected_outputs={"add_one_again": {"result": 6}, "add_ten": {"result": 12}, "double": {"value": 4}},
expected_run_order=["add_one", "repeat", "double", "add_ten", "add_three", "add_one_again"],
expected_run_order=["add_one", "repeat", "add_ten", "double", "add_three", "add_one_again"],
)
],
)
Expand All @@ -448,7 +448,7 @@ def pipeline_that_has_two_branches_that_merge():
PipelineRunData(
inputs={"first_addition": {"value": 1}, "third_addition": {"value": 1}},
expected_outputs={"fourth_addition": {"result": 3}},
expected_run_order=["first_addition", "second_addition", "third_addition", "diff", "fourth_addition"],
expected_run_order=["first_addition", "third_addition", "second_addition", "diff", "fourth_addition"],
)
],
)
Expand Down Expand Up @@ -633,7 +633,7 @@ def run(self, messages: List[ChatMessage]):
},
"mm2": {"merged_message": "Fake message"},
},
expected_run_order=["prompt_builder", "mm1", "llm", "mm2"],
expected_run_order=["prompt_builder", "llm", "mm1", "mm2"],
)
],
)
Expand Down Expand Up @@ -1021,7 +1021,7 @@ def pipeline_that_has_multiple_branches_of_different_lengths_that_merge_into_a_c
PipelineRunData(
inputs={"first_addition": {"value": 1}, "third_addition": {"value": 1}},
expected_outputs={"fourth_addition": {"result": 12}},
expected_run_order=["first_addition", "second_addition", "third_addition", "sum", "fourth_addition"],
expected_run_order=["first_addition", "third_addition", "second_addition", "sum", "fourth_addition"],
)
],
)
Expand Down Expand Up @@ -1376,3 +1376,109 @@ def run(self, prompt: str):
)
],
)


@given(
"a pipeline that has multiple components with only default inputs and are added in a different order from the order of execution",
target_fixture="pipeline_data",
)
def pipeline_that_has_multiple_components_with_only_default_inputs_and_are_added_in_a_different_order_from_the_order_of_execution():
prompt_builder1 = PromptBuilder(
template="""
You are a spellchecking system. Check the given query and fill in the corrected query.
Question: {{question}}
Corrected question:
"""
)
prompt_builder2 = PromptBuilder(
template="""
According to these documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Answer the given question: {{question}}
Answer:
"""
)
prompt_builder3 = PromptBuilder(
template="""
{% for ans in replies %}
{{ ans }}
{% endfor %}
"""
)

@component
class FakeRetriever:
@component.output_types(documents=List[Document])
def run(
self,
query: str,
filters: Optional[Dict[str, Any]] = None,
top_k: Optional[int] = None,
scale_score: Optional[bool] = None,
):
return {"documents": [Document(content="This is a document")]}

@component
class FakeRanker:
@component.output_types(documents=List[Document])
def run(
self,
query: str,
documents: List[Document],
top_k: Optional[int] = None,
scale_score: Optional[bool] = None,
calibration_factor: Optional[float] = None,
score_threshold: Optional[float] = None,
):
return {"documents": documents}

@component
class FakeGenerator:
@component.output_types(replies=List[str], meta=Dict[str, Any])
def run(self, prompt: str, generation_kwargs: Optional[Dict[str, Any]] = None):
return {"replies": ["This is a reply"], "meta": {"meta_key": "meta_value"}}

pipeline = Pipeline()
pipeline.add_component(name="retriever", instance=FakeRetriever())
pipeline.add_component(name="ranker", instance=FakeRanker())
pipeline.add_component(name="prompt_builder2", instance=prompt_builder2)
pipeline.add_component(name="prompt_builder1", instance=prompt_builder1)
pipeline.add_component(name="prompt_builder3", instance=prompt_builder3)
pipeline.add_component(name="llm", instance=FakeGenerator())
pipeline.add_component(name="spellchecker", instance=FakeGenerator())

pipeline.connect("prompt_builder1", "spellchecker")
pipeline.connect("spellchecker.replies", "prompt_builder3")
pipeline.connect("prompt_builder3", "retriever.query")
pipeline.connect("prompt_builder3", "ranker.query")
pipeline.connect("retriever.documents", "ranker.documents")
pipeline.connect("ranker.documents", "prompt_builder2.documents")
pipeline.connect("prompt_builder3", "prompt_builder2.question")
pipeline.connect("prompt_builder2", "llm")

return (
pipeline,
[
PipelineRunData(
inputs={"prompt_builder1": {"question": "Wha i Acromegaly?"}},
expected_outputs={
"llm": {"replies": ["This is a reply"], "meta": {"meta_key": "meta_value"}},
"spellchecker": {"meta": {"meta_key": "meta_value"}},
},
expected_run_order=[
"prompt_builder1",
"spellchecker",
"prompt_builder3",
"retriever",
"ranker",
"prompt_builder2",
"llm",
],
)
],
)
11 changes: 6 additions & 5 deletions test/core/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,12 +840,13 @@ def test__init_run_queue(self):

data = {"yet_another_with_single_input": {"in": 1}}
run_queue = pipe._init_run_queue(data)
assert len(run_queue) == 5
assert run_queue[0][0] == "with_variadic"
assert run_queue[1][0] == "with_no_inputs"
assert run_queue[2][0] == "with_single_input"
assert run_queue[3][0] == "yet_another_with_single_input"
assert len(run_queue) == 6
assert run_queue[0][0] == "with_no_inputs"
assert run_queue[1][0] == "with_single_input"
assert run_queue[2][0] == "yet_another_with_single_input"
assert run_queue[3][0] == "another_with_single_input"
assert run_queue[4][0] == "with_multiple_inputs"
assert run_queue[5][0] == "with_variadic"

def test__init_inputs_state(self):
pipe = Pipeline()
Expand Down

0 comments on commit 0411cd9

Please sign in to comment.