Skip to content

Commit

Permalink
Refactor communication between Pipeline Components (#1321)
Browse files Browse the repository at this point in the history
  • Loading branch information
oryx1729 committed Sep 10, 2021
1 parent 3e6def7 commit 9dd7c74
Show file tree
Hide file tree
Showing 51 changed files with 8,445 additions and 8,319 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ A minimal Open-Domain QA Pipeline:
p = Pipeline()
p.add_node(component=retriever, name="ESRetriever1", inputs=["Query"])
p.add_node(component=reader, name="QAReader", inputs=["ESRetriever1"])
res = p.run(query="What did Einstein work on?", top_k_retriever=1)
res = p.run(query="What did Einstein work on?", params={"retriever": {"top_k": 1}})

```
You can **draw the DAG** to inspect better what you are building:
Expand Down
53 changes: 27 additions & 26 deletions docs/_src/usage/usage/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ from haystack import Pipeline
p = Pipeline()
p.add_node(component=retriever, name="ESRetriever1", inputs=["Query"])
p.add_node(component=reader, name="QAReader", inputs=["ESRetriever1"])
res = p.run(query="What did Einstein work on?", top_k_retriever=1)
res = p.run(query="What did Einstein work on?")
```

You can **draw the DAG** to better inspect what you are building:
Expand All @@ -32,16 +32,16 @@ p.draw(path="custom_pipe.png")

### Arguments

Whatever keyword arguments are passed into the `Pipeline.run()` method will be passed on to each node in the pipeline.
For example, in the code snippet below, all nodes will receive `query`, `top_k_retriever` and `top_k_reader` as argument,
even if they don't use those arguments. It is therefore very important when defining custom nodes that their
keyword argument names do not clash with the other nodes in your pipeline.
Each node in a Pipeline defines the arguments the run() method accepts. The Pipeline class takes care of passing relevant
arguments to the node. In addition to mandatory inputs like `query`, the `run()` accepts optional node parameters like
`top_k` with the `params` argument. For instance, `params={"top_k": 5}` will set the `top_k` of all nodes as 5. To
target params to a specific node, the node name can be explicitly specifie as `params={"Retriever": {"top_k": 5}}`.


```python
res = pipeline.run(
query="What did Einstein work on?",
top_k_retriever=1,
top_k_reader=5
params={"Retriever": {"top_k": 5}, "Reader": {"top_k": 3}}
)
```

Expand Down Expand Up @@ -95,38 +95,39 @@ For another example YAML config, check out [this file](https://github.com/deepse
### Multiple retrievers
You can now also use multiple Retrievers and join their results:
```python
from haystack import Pipeline

p = Pipeline()
p.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"])
p.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["Query"])
p.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults", inputs=["ESRetriever", "DPRRetriever"])
p.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", top_k_retriever=1)
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})
```
![image](https://user-images.githubusercontent.com/1563902/102451782-7bd80400-4039-11eb-9046-01b002a783f8.png)

### Custom nodes
You can easily build your own custom nodes. Just respect the following requirements:
It is easy to build custom nodes. Just respect the following requirements:

1. Add a method `run(self, **kwargs)` to your class. `**kwargs` will contain the output from the previous node in your graph.
2. Do whatever you want within `run()` (e.g. reformatting the query)
3. Return a tuple that contains your output data (for the next node) and the name of the outgoing edge `output_dict, "output_1`
4. Add a class attribute `outgoing_edges = 1` that defines the number of output options from your node. You only need a higher number here if you have a decision node (see below).
1. Create a Class that inherits from `BaseComponent`.
2. Add a `run()` method to your class with any parameters it needs to process the input. Ensure that the parameters are either passed with `params` to the pipeline or are returned by the preceding nodes.
3. Do whatever you want within `run()` (e.g., reformatting the query).
4. Return a tuple that contains your output data (for the next node) and the name of the outgoing edge `output_dict, "output_1`.
5. Add a class attribute `outgoing_edges = 1` that defines your node's number of output options. You only need a higher number here if you have a decision node (see below).

### Decision nodes
Or you can add decision nodes where only one "branch" is executed afterwards. This allows, for example, to classify an incoming query and depending on the result routing it to different modules:
![image](https://user-images.githubusercontent.com/1563902/102452199-41229b80-403a-11eb-9365-7038697e7c3e.png)
```python
class QueryClassifier():
```python
from haystack import BaseComponent, Pipeline

class QueryClassifier(BaseComponent):
outgoing_edges = 2

def run(self, **kwargs):
if "?" in kwargs["query"]:
return (kwargs, "output_1")
def run(self, query):
if "?" in query:
return {}, "output_1"

else:
return (kwargs, "output_2")
return {}, "output_2"

pipe = Pipeline()
pipe.add_node(component=QueryClassifier(), name="QueryClassifier", inputs=["Query"])
Expand All @@ -135,7 +136,7 @@ Or you can add decision nodes where only one "branch" is executed afterwards. Th
pipe.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults",
inputs=["ESRetriever", "DPRRetriever"])
pipe.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", top_k_retriever=1)
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})
```

### Evaluation nodes
Expand All @@ -152,19 +153,19 @@ from haystack.pipeline import DocumentSearchPipeline, ExtractiveQAPipeline, Pipe
# Extractive QA
qa_pipe = ExtractiveQAPipeline(reader=reader, retriever=retriever)
res = qa_pipe.run(query="When was Kant born?", top_k_retriever=3, top_k_reader=5)
res = qa_pipe.run(query="When was Kant born?", params={"retriever": {"top_k": 3}, "reader": {"top_k": 5}})
# Document Search
doc_pipe = DocumentSearchPipeline(retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", top_k_retriever=1)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})
# Generative QA
doc_pipe = GenerativeQAPipeline(generator=rag_generator, retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", top_k_retriever=1)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})
# FAQ based QA
doc_pipe = FAQPipeline(retriever=retriever)
res = doc_pipe.run(query="How can I change my address?", top_k_retriever=3)
res = doc_pipe.run(query="How can I change my address?", params={"retriever": {"top_k": 3}})
```
So to migrate your QA system from the deprecated `Finder` to `ExtractiveQAPipeline` you'd need to:
Expand Down
8 changes: 2 additions & 6 deletions haystack/classifier/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def predict(self, query: str, documents: List[Document], top_k: Optional[int] =
def predict_batch(self, query_doc_list: List[dict], top_k: Optional[int] = None, batch_size: Optional[int] = None):
pass

def run(self, query: str, documents: List[Document], top_k: Optional[int] = None, **kwargs): # type: ignore
def run(self, query: str, documents: List[Document], top_k: Optional[int] = None): # type: ignore
self.query_count += 1
if documents:
predict = self.timing(self.predict, "query_time")
Expand All @@ -36,11 +36,7 @@ def run(self, query: str, documents: List[Document], top_k: Optional[int] = None

document_ids = [doc.id for doc in results]
logger.debug(f"Retrieved documents with IDs: {document_ids}")
output = {
"query": query,
"documents": results,
**kwargs
}
output = {"documents": results}

return output, "output_1"

Expand Down
9 changes: 4 additions & 5 deletions haystack/classifier/farm.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ class FARMClassifier(BaseClassifier):
retriever = ElasticsearchRetriever(document_store=document_store)
classifier = FARMClassifier(model_name_or_path="deepset/bert-base-german-cased-sentiment-Germeval17")
p = Pipeline()
p.add_node(component=retriever, name="ESRetriever", inputs=["Query"])
p.add_node(component=classifier, name="Classifier", inputs=["ESRetriever"])
p.add_node(component=retriever, name="Retriever", inputs=["Query"])
p.add_node(component=classifier, name="Classifier", inputs=["Retriever"])
res = p_extractive.run(
res = p.run(
query="Who is the father of Arya Stark?",
top_k_retriever=10,
top_k_reader=5
params={"Retriever": {"top_k": 10}, "Classifier": {"top_k": 5}}
)
print(res["documents"][0].to_dict()["meta"]["classification"]["label"])
Expand Down
15 changes: 10 additions & 5 deletions haystack/connector/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,15 @@ def _write_to_files(self, urls: List[str], output_dir: Path, base_url: str = Non

return paths

def run(self, output_dir: Union[str, Path, None] = None, urls: Optional[List[str]] = None, # type: ignore
crawler_depth: Optional[int] = None, filter_urls: Optional[List] = None, # type: ignore
overwrite_existing_files: Optional[bool] = None, return_documents: Optional[bool] = False, # type: ignore
**kwargs) -> Tuple[Dict, str]: # type: ignore
def run( # type: ignore
self,
output_dir: Union[str, Path, None] = None,
urls: Optional[List[str]] = None,
crawler_depth: Optional[int] = None,
filter_urls: Optional[List] = None,
overwrite_existing_files: Optional[bool] = None,
return_documents: Optional[bool] = False,
) -> Tuple[Dict, str]:
"""
Method to be executed when the Crawler is used as a Node within a Haystack pipeline.
Expand All @@ -172,7 +177,7 @@ def run(self, output_dir: Union[str, Path, None] = None, urls: Optional[List[str
results = {"documents": crawled_data}
else:
results = {"paths": file_paths}
results.update(**kwargs)

return results, "output_1"

@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions haystack/document_store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ def delete_all_documents(self, index: Optional[str] = None, filters: Optional[Di
def delete_documents(self, index: Optional[str] = None, filters: Optional[Dict[str, List[str]]] = None):
pass

def run(self, documents: List[dict], index: Optional[str] = None, **kwargs): # type: ignore
def run(self, documents: List[dict], index: Optional[str] = None): # type: ignore
self.write_documents(documents=documents, index=index)
return kwargs, "output_1"
return {}, "output_1"

@abstractmethod
def get_documents_by_id(self, ids: List[str], index: Optional[str] = None,
Expand Down
54 changes: 28 additions & 26 deletions haystack/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

from haystack import MultiLabel, Label
from haystack import MultiLabel, Label, BaseComponent, Document

from farm.evaluation.squad_evaluation import compute_f1 as calculate_f1_str
from farm.evaluation.squad_evaluation import compute_exact as calculate_em_str

logger = logging.getLogger(__name__)


class EvalDocuments:
class EvalDocuments(BaseComponent):
"""
This is a pipeline node that should be placed after a node that returns a List of Document, e.g., Retriever or
Ranker, in order to assess its performance. Performance metrics are stored in this class and updated as each
Expand All @@ -22,21 +22,22 @@ class EvalDocuments:
a look at our evaluation tutorial for more info about open vs closed domain eval (
https://haystack.deepset.ai/tutorials/evaluation).
"""
def __init__(self, debug: bool=False, open_domain: bool=True, top_k_eval_documents: int=10, name="EvalDocuments"):

outgoing_edges = 1

def __init__(self, debug: bool=False, open_domain: bool=True, top_k: int=10):
"""
:param open_domain: When True, a document is considered correctly retrieved so long as the answer string can be found within it.
When False, correct retrieval is evaluated based on document_id.
:param debug: When True, a record of each sample and its evaluation will be stored in EvalDocuments.log
:param top_k: calculate eval metrics for top k results, e.g., recall@k
"""
self.outgoing_edges = 1
self.init_counts()
self.no_answer_warning = False
self.debug = debug
self.log: List = []
self.open_domain = open_domain
self.top_k_eval_documents = top_k_eval_documents
self.name = name
self.top_k = top_k
self.too_few_docs_warning = False
self.top_k_used = 0

Expand All @@ -53,25 +54,25 @@ def init_counts(self):
self.reciprocal_rank_sum = 0.0
self.has_answer_reciprocal_rank_sum = 0.0

def run(self, documents, labels: dict, top_k_eval_documents: Optional[int]=None, **kwargs):
def run(self, documents: List[Document], labels: List[Label], top_k: Optional[int] = None): # type: ignore
"""Run this node on one sample and its labels"""
self.query_count += 1
retriever_labels = get_label(labels, kwargs["node_id"])
if not top_k_eval_documents:
top_k_eval_documents = self.top_k_eval_documents
retriever_labels = get_label(labels, self.name)
if not top_k:
top_k = self.top_k

if not self.top_k_used:
self.top_k_used = top_k_eval_documents
elif self.top_k_used != top_k_eval_documents:
self.top_k_used = top_k
elif self.top_k_used != top_k:
logger.warning(f"EvalDocuments was last run with top_k_eval_documents={self.top_k_used} but is "
f"being run again with top_k_eval_documents={self.top_k_eval_documents}. "
f"being run again with top_k={self.top_k}. "
f"The evaluation counter is being reset from this point so that the evaluation "
f"metrics are interpretable.")
self.init_counts()

if len(documents) < top_k_eval_documents and not self.too_few_docs_warning:
logger.warning(f"EvalDocuments is being provided less candidate documents than top_k_eval_documents "
f"(currently set to {top_k_eval_documents}).")
if len(documents) < top_k and not self.too_few_docs_warning:
logger.warning(f"EvalDocuments is being provided less candidate documents than top_k "
f"(currently set to {top_k}).")
self.too_few_docs_warning = True

# TODO retriever_labels is currently a Multilabel object but should eventually be a RetrieverLabel object
Expand All @@ -89,7 +90,7 @@ def run(self, documents, labels: dict, top_k_eval_documents: Optional[int]=None,
# If there are answer span annotations in the labels
else:
self.has_answer_count += 1
retrieved_reciprocal_rank = self.reciprocal_rank_retrieved(retriever_labels, documents, top_k_eval_documents)
retrieved_reciprocal_rank = self.reciprocal_rank_retrieved(retriever_labels, documents, top_k)
self.reciprocal_rank_sum += retrieved_reciprocal_rank
correct_retrieval = True if retrieved_reciprocal_rank > 0 else False
self.has_answer_correct += int(correct_retrieval)
Expand All @@ -101,11 +102,11 @@ def run(self, documents, labels: dict, top_k_eval_documents: Optional[int]=None,
self.recall = self.correct_retrieval_count / self.query_count
self.mean_reciprocal_rank = self.reciprocal_rank_sum / self.query_count

self.top_k_used = top_k_eval_documents
self.top_k_used = top_k

if self.debug:
self.log.append({"documents": documents, "labels": labels, "correct_retrieval": correct_retrieval, "retrieved_reciprocal_rank": retrieved_reciprocal_rank, **kwargs})
return {"documents": documents, "labels": labels, "correct_retrieval": correct_retrieval, "retrieved_reciprocal_rank": retrieved_reciprocal_rank, **kwargs}, "output_1"
self.log.append({"documents": documents, "labels": labels, "correct_retrieval": correct_retrieval, "retrieved_reciprocal_rank": retrieved_reciprocal_rank})
return {"correct_retrieval": correct_retrieval}, "output_1"

def is_correctly_retrieved(self, retriever_labels, predictions):
return self.reciprocal_rank_retrieved(retriever_labels, predictions) > 0
Expand Down Expand Up @@ -142,7 +143,7 @@ def print(self):
print(f"mean_reciprocal_rank@{self.top_k_used}: {self.mean_reciprocal_rank:.4f}")


class EvalAnswers:
class EvalAnswers(BaseComponent):
"""
This is a pipeline node that should be placed after a Reader in order to assess the performance of the Reader
individually or to assess the extractive QA performance of the whole pipeline. Performance metrics are stored in
Expand All @@ -152,6 +153,8 @@ class EvalAnswers:
open vs closed domain eval (https://haystack.deepset.ai/tutorials/evaluation).
"""

outgoing_edges = 1

def __init__(self,
skip_incorrect_retrieval: bool = True,
open_domain: bool = True,
Expand All @@ -174,7 +177,6 @@ def __init__(self,
- Large model for German only: "deepset/gbert-large-sts"
:param debug: When True, a record of each sample and its evaluation will be stored in EvalAnswers.log
"""
self.outgoing_edges = 1
self.log: List = []
self.debug = debug
self.skip_incorrect_retrieval = skip_incorrect_retrieval
Expand Down Expand Up @@ -203,14 +205,14 @@ def init_counts(self):
self.top_1_sas = 0.0
self.top_k_sas = 0.0

def run(self, labels, answers, **kwargs):
def run(self, labels: List[Label], answers: List[dict], correct_retrieval: bool): # type: ignore
"""Run this node on one sample and its labels"""
self.query_count += 1
predictions = answers
skip = self.skip_incorrect_retrieval and not kwargs.get("correct_retrieval")
skip = self.skip_incorrect_retrieval and not correct_retrieval
if predictions and not skip:
self.correct_retrieval_count += 1
multi_labels = get_label(labels, kwargs["node_id"])
multi_labels = get_label(labels, self.name)
# If this sample is impossible to answer and expects a no_answer response
if multi_labels.no_answer:
self.no_answer_count += 1
Expand Down Expand Up @@ -254,7 +256,7 @@ def run(self, labels, answers, **kwargs):
self.top_k_em_count += top_k_em
self.top_k_f1_sum += top_k_f1
self.update_has_answer_metrics()
return {**kwargs}, "output_1"
return {}, "output_1"

def evaluate_extraction(self, gold_labels, predictions):
if self.open_domain:
Expand Down
Loading

0 comments on commit 9dd7c74

Please sign in to comment.