diff --git a/README.md b/README.md index f725e59..3582d18 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ For more examples, you may also find our [Blog](https://haystack.deepset.ai/blog | Evaluate a RAG pipeline using Ragas integration| Open In Colab| | Sparse Embedding Retrieval with Qdrant and FastEmbed| Open In Colab| | Extract Metadata Filters from a Query | Open In Colab| +| Run tasks concurrently within a custom component | Open In Colab| | Cohere for Multilingual QA (Haystack 1.x)| Open In Colab| | GPT-4 and Weaviate for Custom Documentation QA (Haystack 1.x)| Open In Colab| | Whisper Transcriber and Weaviate for YouTube video QA (Haystack 1.x)| Open In Colab| diff --git a/notebooks/concurrent_tasks.ipynb b/notebooks/concurrent_tasks.ipynb new file mode 100644 index 0000000..19dea07 --- /dev/null +++ b/notebooks/concurrent_tasks.ipynb @@ -0,0 +1,1249 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "id": "e1HZgoMLMdXm" + }, + "source": [ + "# Run tasks concurrently within a custom component\n", + "\n", + "\n", + " \"Open\n", + "\n", + "\n", + "The execution logic of Haystack Pipelines is synchronous. Components, even if they belong to parallel branches, run one after the other.\n", + "\n", + "This has several advantages, including ease of debugging and the ability to handle complex workflows. In many cases/applications, this execution logic works well.\n", + "\n", + "Sometimes you may want to run some components concurrently.\n", + "This can be useful for component that perform I/O-bound tasks, where most of the time is spent waiting for input/output operations. Consider, for example, waiting for a response from LLM API clients or database clients.\n", + "\n", + "In this cookbook, we show how you can wrap multiple components into one that will run them concurrently in different threads." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "colab_type": "toc", + "id": "UYtqwCeHSUyV" + }, + "source": [ + ">[Run tasks concurrently using custom components](#scrollTo=e1HZgoMLMdXm)\n", + "\n", + ">>[Preparation](#scrollTo=_hUHkv5eOjsC)\n", + "\n", + ">>[Concurrent Generators](#scrollTo=sEy_ltiXOtsz)\n", + "\n", + ">>>[Baseline: Pipeline with Generators running one after another](#scrollTo=8d2T7u_tPb5a)\n", + "\n", + ">>>[Optimisation: Pipeline with Generators running concurrently](#scrollTo=uuQIEraiRDwS)\n", + "\n", + ">>>>[ConcurrentGenerators component](#scrollTo=tXfYVs6RRSGO)\n", + "\n", + ">>>>[Pipeline with ConcurrentGenerators component](#scrollTo=-mG5f6gzRg93)\n", + "\n", + ">>[Concurrent Retrievers](#scrollTo=iDrLj-GYSihL)\n", + "\n", + ">>>[Baseline: Pipeline with Retrievers running one after another](#scrollTo=Grag46TJUdfo)\n", + "\n", + ">>>[Optimisation: Pipeline with Retrievers running concurrently](#scrollTo=MXpYIrJ6U5_e)\n", + "\n", + ">>>>[ConcurrentRetrievers component](#scrollTo=vDujbLZxVK2B)\n", + "\n", + ">>>>[Pipeline with ConcurrentRetrievers component](#scrollTo=Q0KgQbPyVUfY)\n", + "\n", + ">>[Conclusions](#scrollTo=ORRniqskV3n7)\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "_hUHkv5eOjsC" + }, + "source": [ + "## Preparation" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/" + }, + "id": "y83PSWgPeMZh", + "outputId": "c4312fed-450d-492b-c07d-dd2efe232886" + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Collecting haystack-ai\n", + " Downloading haystack_ai-2.1.0-py3-none-any.whl (318 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m318.9/318.9 kB\u001b[0m \u001b[31m3.6 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hCollecting cohere-haystack\n", + " Downloading cohere_haystack-1.0.0-py3-none-any.whl (22 kB)\n", + "Collecting elasticsearch-haystack\n", + " Downloading elasticsearch_haystack-0.4.0-py3-none-any.whl (17 kB)\n", + "Collecting boilerpy3 (from haystack-ai)\n", + " Downloading boilerpy3-1.0.7-py3-none-any.whl (22 kB)\n", + "Collecting haystack-bm25 (from haystack-ai)\n", + " Downloading haystack_bm25-1.0.2-py2.py3-none-any.whl (8.8 kB)\n", + "Requirement already satisfied: jinja2 in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (3.1.3)\n", + "Collecting lazy-imports (from haystack-ai)\n", + " Downloading lazy_imports-0.3.1-py3-none-any.whl (12 kB)\n", + "Requirement already satisfied: more-itertools in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (10.1.0)\n", + "Requirement already satisfied: networkx in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (3.3)\n", + "Requirement already satisfied: numpy in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (1.25.2)\n", + "Collecting openai>=1.1.0 (from haystack-ai)\n", + " Downloading openai-1.27.0-py3-none-any.whl (314 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m314.1/314.1 kB\u001b[0m \u001b[31m24.0 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hRequirement already satisfied: pandas in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (2.0.3)\n", + "Collecting posthog (from haystack-ai)\n", + " Downloading posthog-3.5.0-py2.py3-none-any.whl (41 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m41.3/41.3 kB\u001b[0m \u001b[31m4.3 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hRequirement already satisfied: python-dateutil in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (2.8.2)\n", + "Requirement already satisfied: pyyaml in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (6.0.1)\n", + "Requirement already satisfied: requests in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (2.31.0)\n", + "Requirement already satisfied: tenacity in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (8.2.3)\n", + "Requirement already satisfied: tqdm in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (4.66.4)\n", + "Requirement already satisfied: typing-extensions>=4.7 in /usr/local/lib/python3.10/dist-packages (from haystack-ai) (4.11.0)\n", + "Collecting cohere==5.* (from cohere-haystack)\n", + " Downloading cohere-5.3.5-py3-none-any.whl (156 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m156.3/156.3 kB\u001b[0m \u001b[31m10.1 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hCollecting fastavro<2.0.0,>=1.9.4 (from cohere==5.*->cohere-haystack)\n", + " Downloading fastavro-1.9.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m3.1/3.1 MB\u001b[0m \u001b[31m37.1 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hCollecting httpx>=0.21.2 (from cohere==5.*->cohere-haystack)\n", + " Downloading httpx-0.27.0-py3-none-any.whl (75 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m75.6/75.6 kB\u001b[0m \u001b[31m4.6 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hCollecting httpx-sse<0.5.0,>=0.4.0 (from cohere==5.*->cohere-haystack)\n", + " Downloading httpx_sse-0.4.0-py3-none-any.whl (7.8 kB)\n", + "Requirement already satisfied: pydantic>=1.9.2 in /usr/local/lib/python3.10/dist-packages (from cohere==5.*->cohere-haystack) (2.7.1)\n", + "Requirement already satisfied: tokenizers<0.20,>=0.19 in /usr/local/lib/python3.10/dist-packages (from cohere==5.*->cohere-haystack) (0.19.1)\n", + "Collecting types-requests<3.0.0,>=2.0.0 (from cohere==5.*->cohere-haystack)\n", + " Downloading types_requests-2.31.0.20240406-py3-none-any.whl (15 kB)\n", + "Collecting elasticsearch<9,>=8 (from elasticsearch-haystack)\n", + " Downloading elasticsearch-8.13.1-py3-none-any.whl (477 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m477.5/477.5 kB\u001b[0m \u001b[31m10.0 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hCollecting elastic-transport<9,>=8.13 (from elasticsearch<9,>=8->elasticsearch-haystack)\n", + " Downloading elastic_transport-8.13.0-py3-none-any.whl (64 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m64.3/64.3 kB\u001b[0m \u001b[31m3.0 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hRequirement already satisfied: anyio<5,>=3.5.0 in /usr/local/lib/python3.10/dist-packages (from openai>=1.1.0->haystack-ai) (3.7.1)\n", + "Requirement already satisfied: distro<2,>=1.7.0 in /usr/lib/python3/dist-packages (from openai>=1.1.0->haystack-ai) (1.7.0)\n", + "Requirement already satisfied: sniffio in /usr/local/lib/python3.10/dist-packages (from openai>=1.1.0->haystack-ai) (1.3.1)\n", + "Requirement already satisfied: charset-normalizer<4,>=2 in /usr/local/lib/python3.10/dist-packages (from requests->haystack-ai) (3.3.2)\n", + "Requirement already satisfied: idna<4,>=2.5 in /usr/local/lib/python3.10/dist-packages (from requests->haystack-ai) (3.7)\n", + "Requirement already satisfied: urllib3<3,>=1.21.1 in /usr/local/lib/python3.10/dist-packages (from requests->haystack-ai) (2.0.7)\n", + "Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python3.10/dist-packages (from requests->haystack-ai) (2024.2.2)\n", + "Requirement already satisfied: MarkupSafe>=2.0 in /usr/local/lib/python3.10/dist-packages (from jinja2->haystack-ai) (2.1.5)\n", + "Requirement already satisfied: pytz>=2020.1 in /usr/local/lib/python3.10/dist-packages (from pandas->haystack-ai) (2023.4)\n", + "Requirement already satisfied: tzdata>=2022.1 in /usr/local/lib/python3.10/dist-packages (from pandas->haystack-ai) (2024.1)\n", + "Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.10/dist-packages (from python-dateutil->haystack-ai) (1.16.0)\n", + "Collecting monotonic>=1.5 (from posthog->haystack-ai)\n", + " Downloading monotonic-1.6-py2.py3-none-any.whl (8.2 kB)\n", + "Collecting backoff>=1.10.0 (from posthog->haystack-ai)\n", + " Downloading backoff-2.2.1-py3-none-any.whl (15 kB)\n", + "Requirement already satisfied: exceptiongroup in /usr/local/lib/python3.10/dist-packages (from anyio<5,>=3.5.0->openai>=1.1.0->haystack-ai) (1.2.1)\n", + "Collecting httpcore==1.* (from httpx>=0.21.2->cohere==5.*->cohere-haystack)\n", + " Downloading httpcore-1.0.5-py3-none-any.whl (77 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m77.9/77.9 kB\u001b[0m \u001b[31m5.3 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hCollecting h11<0.15,>=0.13 (from httpcore==1.*->httpx>=0.21.2->cohere==5.*->cohere-haystack)\n", + " Downloading h11-0.14.0-py3-none-any.whl (58 kB)\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m58.3/58.3 kB\u001b[0m \u001b[31m4.3 MB/s\u001b[0m eta \u001b[36m0:00:00\u001b[0m\n", + "\u001b[?25hRequirement already satisfied: annotated-types>=0.4.0 in /usr/local/lib/python3.10/dist-packages (from pydantic>=1.9.2->cohere==5.*->cohere-haystack) (0.6.0)\n", + "Requirement already satisfied: pydantic-core==2.18.2 in /usr/local/lib/python3.10/dist-packages (from pydantic>=1.9.2->cohere==5.*->cohere-haystack) (2.18.2)\n", + "Requirement already satisfied: huggingface-hub<1.0,>=0.16.4 in /usr/local/lib/python3.10/dist-packages (from tokenizers<0.20,>=0.19->cohere==5.*->cohere-haystack) (0.20.3)\n", + "Requirement already satisfied: filelock in /usr/local/lib/python3.10/dist-packages (from huggingface-hub<1.0,>=0.16.4->tokenizers<0.20,>=0.19->cohere==5.*->cohere-haystack) (3.14.0)\n", + "Requirement already satisfied: fsspec>=2023.5.0 in /usr/local/lib/python3.10/dist-packages (from huggingface-hub<1.0,>=0.16.4->tokenizers<0.20,>=0.19->cohere==5.*->cohere-haystack) (2023.6.0)\n", + "Requirement already satisfied: packaging>=20.9 in /usr/local/lib/python3.10/dist-packages (from huggingface-hub<1.0,>=0.16.4->tokenizers<0.20,>=0.19->cohere==5.*->cohere-haystack) (24.0)\n", + "Installing collected packages: monotonic, types-requests, lazy-imports, httpx-sse, haystack-bm25, h11, fastavro, elastic-transport, boilerpy3, backoff, posthog, httpcore, elasticsearch, httpx, openai, cohere, haystack-ai, elasticsearch-haystack, cohere-haystack\n", + "Successfully installed backoff-2.2.1 boilerpy3-1.0.7 cohere-5.3.5 cohere-haystack-1.0.0 elastic-transport-8.13.0 elasticsearch-8.13.1 elasticsearch-haystack-0.4.0 fastavro-1.9.4 h11-0.14.0 haystack-ai-2.1.0 haystack-bm25-1.0.2 httpcore-1.0.5 httpx-0.27.0 httpx-sse-0.4.0 lazy-imports-0.3.1 monotonic-1.6 openai-1.27.0 posthog-3.5.0 types-requests-2.31.0.20240406\n" + ] + } + ], + "source": [ + "! pip install haystack-ai cohere-haystack elasticsearch-haystack" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": { + "id": "g_WBZEUUPGt-" + }, + "outputs": [], + "source": [ + "import os\n", + "from rich import print\n", + "import time" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "id": "IurPbpyRQ-2x" + }, + "outputs": [], + "source": [ + "# this is only needed in Juptyer, where an asyncio event loop is already running\n", + "\n", + "import nest_asyncio\n", + "nest_asyncio.apply()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "sEy_ltiXOtsz" + }, + "source": [ + "## Concurrent Generators\n", + "\n", + "Use case: we want to send the same prompt to different generators and aggregate the results." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "id": "1YhPdIH_p6ZU" + }, + "outputs": [], + "source": [ + "os.environ[\"OPENAI_API_KEY\"]=\"your OpenAI API key\"\n", + "os.environ[\"COHERE_API_KEY\"]=\"your Cohere API key\"" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": { + "id": "KZ2Nhk8Mp0bc" + }, + "outputs": [], + "source": [ + "# an example Document to summarize\n", + "\n", + "from haystack import Document\n", + "\n", + "\n", + "text=\"\"\"\n", + "The giant panda (Ailuropoda melanoleuca), also known as the panda bear or simply panda, is a bear species endemic to China. It is characterised by its black-and-white coat and rotund body. The name \"giant panda\" is sometimes used to distinguish it from the red panda, a neighboring musteloid. Adult individuals average 100 to 115 kg (220 to 254 lb), and are typically 1.2 to 1.9 m (3 ft 11 in to 6 ft 3 in) long. The species is sexually dimorphic, as males are typically 10 to 20% larger. The fur is white, with black patches around the eyes, ears, legs and shoulders. A thumb is visible on the bear's forepaw, which helps in holding bamboo in place for feeding. Giant pandas have adapted larger molars and expanded temporal fossa to meet their dietary requirements.\n", + "\n", + "The giant panda is exclusively found in six mountainous regions in a few provinces. It is also found in elevations of up to 3,000 m (9,800 ft). Its diet consists almost entirely of bamboo, making the bear mostly herbivorous, despite being classified in the order Carnivora. The shoot is an important energy source, as it contains starch and is 32% protein, hence pandas evolved the ability to effectively digest starch. They are solitary, only gathering in times of mating. Females rear cubs for an average of 18 to 24 months. Potential predators of sub-adult pandas include leopards. Giant pandas heavily rely on olfactory communication to communicate with one another; scent marks are used as chemical cues and on landmarks like rocks or trees. Giant pandas live long lives, with the oldest known individual dying at 38.\n", + "\n", + "As a result of farming, deforestation, and other development, the giant panda has been driven out of the lowland areas where it once lived, and it is a conservation-reliant vulnerable species. A 2007 report showed 239 pandas living in captivity inside China and another 27 outside the country. Some reports also show that the number of giant pandas in the wild is on the rise. By March 2015, the wild giant panda population had increased to 1,864 individuals. In 2016, it was reclassified on the IUCN Red List from \"endangered\" to \"vulnerable\", affirming decade-long efforts to save the panda. In July 2021, Chinese authorities also reclassified the giant panda as vulnerable. The giant panda has often served as China's national symbol, appeared on Chinese Gold Panda coins since 1982 and as one of the five Fuwa mascots of the 2008 Summer Olympics held in Beijing.\n", + "\"\"\"\n", + "\n", + "documents = [Document(content=text)]" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "8d2T7u_tPb5a" + }, + "source": [ + "### Baseline: Pipeline with Generators running one after another" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 514 + }, + "id": "9a6Ncqjaoo60", + "outputId": "4dcc4092-0137-49d7-8dea-000378aa9965" + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:haystack_integrations.components.generators.cohere.generator:The 'generate' API is marked as Legacy and is no longer maintained by Cohere. We recommend to use the CohereChatGenerator instead.\n" + ] + }, + { + "data": { + "text/html": [ + "
{\n",
+              "    'second_generator': {\n",
+              "        'replies': [\n",
+              "            ' The giant panda is a black and white bear species endemic to China. Scientists have struggled to \n",
+              "classify the species due to its unique, exclusive diet of bamboo. The giant panda lives in mountainous regions, is \n",
+              "mostly herbivorous, and is a solitary animal. Due to deforestation and development, the giant panda is now a \n",
+              "vulnerable species, and relies on conservation efforts for its survival. '\n",
+              "        ],\n",
+              "        'meta': [{'finish_reason': 'COMPLETE'}]\n",
+              "    },\n",
+              "    'first_generator': {\n",
+              "        'replies': [\n",
+              "            'The giant panda is a bear species endemic to China, known for its black-and-white coat and reliance on\n",
+              "bamboo in its diet. Despite being classified as carnivores, they are mostly herbivorous. Giant pandas live in \n",
+              "mountainous regions and are solitary animals, only coming together during mating. They are a conservation-reliant \n",
+              "vulnerable species due to farming, deforestation, and other development. Efforts to save the giant panda have been \n",
+              "successful, with the wild population increasing in recent years. It is also a symbol of China and has been featured\n",
+              "on Chinese coins and Olympic mascots.'\n",
+              "        ],\n",
+              "        'meta': [\n",
+              "            {\n",
+              "                'model': 'gpt-3.5-turbo-0125',\n",
+              "                'index': 0,\n",
+              "                'finish_reason': 'stop',\n",
+              "                'usage': {'completion_tokens': 117, 'prompt_tokens': 585, 'total_tokens': 702}\n",
+              "            }\n",
+              "        ]\n",
+              "    }\n",
+              "}\n",
+              "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[32m'second_generator'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'replies'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m' The giant panda is a black and white bear species endemic to China. Scientists have struggled to \u001b[0m\n", + "\u001b[32mclassify the species due to its unique, exclusive diet of bamboo. The giant panda lives in mountainous regions, is \u001b[0m\n", + "\u001b[32mmostly herbivorous, and is a solitary animal. Due to deforestation and development, the giant panda is now a \u001b[0m\n", + "\u001b[32mvulnerable species, and relies on conservation efforts for its survival. '\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[32m'meta'\u001b[0m: \u001b[1m[\u001b[0m\u001b[1m{\u001b[0m\u001b[32m'finish_reason'\u001b[0m: \u001b[32m'COMPLETE'\u001b[0m\u001b[1m}\u001b[0m\u001b[1m]\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[32m'first_generator'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'replies'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m'The giant panda is a bear species endemic to China, known for its black-and-white coat and reliance on\u001b[0m\n", + "\u001b[32mbamboo in its diet. Despite being classified as carnivores, they are mostly herbivorous. Giant pandas live in \u001b[0m\n", + "\u001b[32mmountainous regions and are solitary animals, only coming together during mating. They are a conservation-reliant \u001b[0m\n", + "\u001b[32mvulnerable species due to farming, deforestation, and other development. Efforts to save the giant panda have been \u001b[0m\n", + "\u001b[32msuccessful, with the wild population increasing in recent years. It is also a symbol of China and has been featured\u001b[0m\n", + "\u001b[32mon Chinese coins and Olympic mascots.'\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[32m'meta'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[32m'model'\u001b[0m: \u001b[32m'gpt-3.5-turbo-0125'\u001b[0m,\n", + " \u001b[32m'index'\u001b[0m: \u001b[1;36m0\u001b[0m,\n", + " \u001b[32m'finish_reason'\u001b[0m: \u001b[32m'stop'\u001b[0m,\n", + " \u001b[32m'usage'\u001b[0m: \u001b[1m{\u001b[0m\u001b[32m'completion_tokens'\u001b[0m: \u001b[1;36m117\u001b[0m, \u001b[32m'prompt_tokens'\u001b[0m: \u001b[1;36m585\u001b[0m, \u001b[32m'total_tokens'\u001b[0m: \u001b[1;36m702\u001b[0m\u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
Time taken for 5 non concurrent calls: 28.85969638824463 seconds\n",
+              "
\n" + ], + "text/plain": [ + "Time taken for \u001b[1;36m5\u001b[0m non concurrent calls: \u001b[1;36m28.85969638824463\u001b[0m seconds\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "from haystack import Pipeline\n", + "\n", + "from haystack.components.generators import OpenAIGenerator\n", + "from haystack_integrations.components.generators.cohere import CohereGenerator\n", + "from haystack.components.builders import PromptBuilder\n", + "from haystack.components.others import Multiplexer\n", + "\n", + "documents = [Document(content=text)]\n", + "\n", + "\n", + "template = \"\"\"Write a short summary of the following text.\n", + "Text:\n", + "{% for doc in documents %}\n", + " {{ doc.content }}\n", + "{% endfor %}\n", + "\"\"\"\n", + "\n", + "pipeline = Pipeline()\n", + "pipeline.add_component(\"prompt_builder\", PromptBuilder(template))\n", + "pipeline.add_component(\"multiplexer\", Multiplexer(str))\n", + "pipeline.add_component(\"first_generator\", OpenAIGenerator())\n", + "pipeline.add_component(\"second_generator\", CohereGenerator())\n", + "\n", + "pipeline.connect(\"prompt_builder\", \"multiplexer\")\n", + "pipeline.connect(\"multiplexer.value\", \"first_generator\")\n", + "pipeline.connect(\"multiplexer.value\", \"second_generator\")\n", + "\n", + "start = time.time()\n", + "n=5\n", + "for i in range(n):\n", + " result = pipeline.run({\"prompt_builder\": {\"documents\": documents}})\n", + "end = time.time()\n", + "print(result)\n", + "print(f\"Time taken for {n} non concurrent calls: {end - start} seconds\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "uuQIEraiRDwS" + }, + "source": [ + "### Optimisation: Pipeline with Generators running concurrently\n", + "\n", + "We wrap multiple generators into a single component that internally will run them concurrently in different threads. To simplify the threads orchestration, we use the `to_thread` function from `asyncio`. The component itself will stay synchronous, exposing the usual `run` method." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "tXfYVs6RRSGO" + }, + "source": [ + "#### ConcurrentGenerators component\n", + "\n", + "- This component expects a list of generators and names as initialization parameters.\n", + "\n", + "- The `_arun` method is an async method, where concurrent execution takes place. It creates a thread for each generator and waits for all threads to finish. Then the results are collected, aggregated, and returned.\n", + "\n", + "- The `run` method is synchronous (as expected by the Haystack pipeline) and calls the `_arun` method using `asyncio.run`.\n", + "\n", + "\n", + "Learn about creating custom components [in our documentation](https://docs.haystack.deepset.ai/docs/custom-components)." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": { + "id": "BisvhZlRqN7p" + }, + "outputs": [], + "source": [ + "from haystack.core.component import Component\n", + "from haystack.components.generators import OpenAIGenerator\n", + "from haystack.components.builders import PromptBuilder\n", + "from typing import List, Dict, Any, Optional\n", + "from haystack import component, default_from_dict, default_to_dict\n", + "from haystack.utils.type_serialization import deserialize_type\n", + "import asyncio\n", + "\n", + "\n", + "@component\n", + "class ConcurrentGenerators:\n", + " def __init__(self, generators: List[Component], names: Optional[List[str]] = None):\n", + " self.generators = generators\n", + " if names is None:\n", + " names = [f\"generator_{i}\" for i in range(len(generators))]\n", + " self.names = names\n", + "\n", + "\t\t\t # we set the output types here so that the results are not too nested\n", + " output_types = {k: Dict[str, Any] for k in names}\n", + " component.set_output_types(self, **output_types)\n", + "\n", + " def warm_up(self):\n", + " \"\"\"Warm up the generators.\"\"\"\n", + " for generator in self.generators:\n", + " if hasattr(generator, \"warm_up\"):\n", + " generator.warm_up()\n", + "\n", + " async def _arun(self, **kwargs):\n", + " \"\"\"\n", + " Asynchrounous method to run the generators concurrently.\n", + " \"\"\"\n", + "\n", + " # the generators run in separate threads\n", + " results = await asyncio.gather(\n", + " *[asyncio.to_thread(generator.run, **kwargs) for generator in self.generators]\n", + " )\n", + "\n", + " organized_results = {}\n", + " for generator_name, res_ in zip(self.names, results):\n", + " organized_results[generator_name] = res_\n", + " return organized_results\n", + "\n", + " def run(self, prompt: str):\n", + " \"\"\"\n", + " Synchronous run method that can be integrated into a classic synchronous pipeline.\n", + " \"\"\"\n", + " results = asyncio.run(self._arun(prompt=prompt))\n", + " return {\"results\": results}\n", + "\n", + "\n", + " def to_dict(self):\n", + " generators = [generator.to_dict() for generator in self.generators]\n", + " return default_to_dict(self, generators=generators, names=self.names)\n", + "\n", + " @classmethod\n", + " def from_dict(cls, data: Dict[str, Any]) -> \"ConcurrentGenerators\":\n", + " init_params = data.get(\"init_parameters\", {})\n", + "\n", + " # Deserialize the generators\n", + " generators = []\n", + " serialized_generators = init_params[\"generators\"]\n", + " for serialized_generator in serialized_generators:\n", + " generator_class = deserialize_type(serialized_generator[\"type\"])\n", + " generator = generator_class.from_dict(serialized_generator)\n", + " generators.append(generator)\n", + "\n", + " data[\"init_parameters\"][\"generators\"] = generators\n", + " return default_from_dict(cls, data)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "-mG5f6gzRg93" + }, + "source": [ + "#### Pipeline with ConcurrentGenerators component" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 594 + }, + "id": "fW8ghvbMsPqE", + "outputId": "c7b2d926-7a9e-42bc-b7fa-e268b315f274" + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:haystack_integrations.components.generators.cohere.generator:The 'generate' API is marked as Legacy and is no longer maintained by Cohere. We recommend to use the CohereChatGenerator instead.\n" + ] + }, + { + "data": { + "text/html": [ + "
{\n",
+              "    'concurrent_generators': {\n",
+              "        'results': {\n",
+              "            'openai': {\n",
+              "                'replies': [\n",
+              "                    'The text provides information about the giant panda, a bear species endemic to China known for\n",
+              "its black-and-white coat and rotund body. The species primarily feeds on bamboo and is classified as herbivorous \n",
+              "despite being in the order Carnivora. Due to farming, deforestation, and development, the giant panda is a \n",
+              "conservation-reliant vulnerable species. Efforts have been made to save the panda, resulting in an increase in the \n",
+              "wild population. The giant panda is a national symbol of China and has been featured on various coins and mascots, \n",
+              "including the 2008 Beijing Summer Olympics.'\n",
+              "                ],\n",
+              "                'meta': [\n",
+              "                    {\n",
+              "                        'model': 'gpt-3.5-turbo-0125',\n",
+              "                        'index': 0,\n",
+              "                        'finish_reason': 'stop',\n",
+              "                        'usage': {'completion_tokens': 119, 'prompt_tokens': 585, 'total_tokens': 704}\n",
+              "                    }\n",
+              "                ]\n",
+              "            },\n",
+              "            'cohere': {\n",
+              "                'replies': [\n",
+              "                    \" The giant panda is a vulnerable species found only in China. Despite its dietary \n",
+              "classification, the giant panda is herbivorous, feeding mostly on bamboo. They are heavy climbers and typically \n",
+              "live alone or in pairs. \\nThe giant panda has been China's national symbol and over the years, its \n",
+              "conservation-reliant status has been a focal point for Chinese authorities. The giant panda was reclassified as \n",
+              "vulnerable in 2016, and again in 2021. \"\n",
+              "                ],\n",
+              "                'meta': [{'finish_reason': 'COMPLETE'}]\n",
+              "            }\n",
+              "        }\n",
+              "    }\n",
+              "}\n",
+              "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[32m'concurrent_generators'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'results'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'openai'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'replies'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m'The text provides information about the giant panda, a bear species endemic to China known for\u001b[0m\n", + "\u001b[32mits black-and-white coat and rotund body. The species primarily feeds on bamboo and is classified as herbivorous \u001b[0m\n", + "\u001b[32mdespite being in the order Carnivora. Due to farming, deforestation, and development, the giant panda is a \u001b[0m\n", + "\u001b[32mconservation-reliant vulnerable species. Efforts have been made to save the panda, resulting in an increase in the \u001b[0m\n", + "\u001b[32mwild population. The giant panda is a national symbol of China and has been featured on various coins and mascots, \u001b[0m\n", + "\u001b[32mincluding the 2008 Beijing Summer Olympics.'\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[32m'meta'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[32m'model'\u001b[0m: \u001b[32m'gpt-3.5-turbo-0125'\u001b[0m,\n", + " \u001b[32m'index'\u001b[0m: \u001b[1;36m0\u001b[0m,\n", + " \u001b[32m'finish_reason'\u001b[0m: \u001b[32m'stop'\u001b[0m,\n", + " \u001b[32m'usage'\u001b[0m: \u001b[1m{\u001b[0m\u001b[32m'completion_tokens'\u001b[0m: \u001b[1;36m119\u001b[0m, \u001b[32m'prompt_tokens'\u001b[0m: \u001b[1;36m585\u001b[0m, \u001b[32m'total_tokens'\u001b[0m: \u001b[1;36m704\u001b[0m\u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[32m'cohere'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'replies'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\" The giant panda is a vulnerable species found only in China. Despite its dietary \u001b[0m\n", + "\u001b[32mclassification, the giant panda is herbivorous, feeding mostly on bamboo. They are heavy climbers and typically \u001b[0m\n", + "\u001b[32mlive alone or in pairs. \\nThe giant panda has been China's national symbol and over the years, its \u001b[0m\n", + "\u001b[32mconservation-reliant status has been a focal point for Chinese authorities. The giant panda was reclassified as \u001b[0m\n", + "\u001b[32mvulnerable in 2016, and again in 2021. \"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[32m'meta'\u001b[0m: \u001b[1m[\u001b[0m\u001b[1m{\u001b[0m\u001b[32m'finish_reason'\u001b[0m: \u001b[32m'COMPLETE'\u001b[0m\u001b[1m}\u001b[0m\u001b[1m]\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
Time taken for 5 concurrent calls: 17.31975769996643 seconds\n",
+              "
\n" + ], + "text/plain": [ + "Time taken for \u001b[1;36m5\u001b[0m concurrent calls: \u001b[1;36m17.31975769996643\u001b[0m seconds\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pipeline = Pipeline()\n", + "pipeline.add_component(\"prompt_builder\", PromptBuilder(template))\n", + "pipeline.add_component(\"concurrent_generators\", ConcurrentGenerators(\n", + " generators=[OpenAIGenerator(), CohereGenerator()],\n", + " names=[\"openai\", \"cohere\"]))\n", + "\n", + "pipeline.connect(\"prompt_builder\", \"concurrent_generators\")\n", + "\n", + "start = time.time()\n", + "n=5\n", + "for i in range(n):\n", + " result = pipeline.run({\"prompt_builder\": {\"documents\": documents}})\n", + "end = time.time()\n", + "print(result)\n", + "print(f\"Time taken for {n} concurrent calls: {end - start} seconds\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Tht17qr7SBPi" + }, + "source": [ + "Nice!\n", + "Our approach does the trick." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "iDrLj-GYSihL" + }, + "source": [ + "## Concurrent Retrievers\n", + "\n", + "Use case: we want to send a user query to different retrievers concurrently. For example, we could send a user query to a keyword-based retriever and a semantic one, and join both sets of results.\n", + "\n", + "⚠️ Some Document Stores have Hybrid Retrievers that internally send a single batch query to the DB. This solution is generally more efficient than the following approach." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We first generate some Documents with random `content` and `embedding`.\n", + "The documents are written in `ElasticsearchDocumentStore`." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 257 + }, + "id": "BOHkxLDjv2Ed", + "outputId": "de4fa16f-f497-4077-aca3-88b3967e8757" + }, + "outputs": [ + { + "data": { + "text/html": [ + "
1000\n",
+              "
\n" + ], + "text/plain": [ + "\u001b[1;36m1000\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
[\n",
+              "    Document(id=c02b93cdfbc7afa9d8475f20500e340d43e1dbbd0b8e17458817d886f5ff3d68, content: 'The capital of Italy is\n",
+              "Rome 0', score: 1.0, embedding: vector of size 768),\n",
+              "    Document(id=49679c985b4ab56bddebd77190836cfe772eb537121f87dc5ec67313931a3db0, content: 'The capital of Italy is\n",
+              "Rome 1', score: 1.0, embedding: vector of size 768),\n",
+              "    Document(id=76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5, content: 'The capital of Spain is\n",
+              "Madrid 2', score: 1.0, embedding: vector of size 768),\n",
+              "    Document(id=e44a482bfde5acf6636f9046f28634175dcd3f8021a7a7297e3aeb81f3aeef23, content: 'The capital of France \n",
+              "is Paris 3', score: 1.0, embedding: vector of size 768),\n",
+              "    Document(id=3c14dad24490e08e8dad5b9ac53a7180181eb80f1656d31b08a3dc314a3983be, content: 'The capital of Italy is\n",
+              "Rome 4', score: 1.0, embedding: vector of size 768),\n",
+              "    Document(id=51aaaae5e07b07b3b53b07bf2cd159432ce0c4b2178a79f41999b6cb7c1f2af1, content: 'The capital of Italy is\n",
+              "Rome 5', score: 1.0, embedding: vector of size 768)\n",
+              "]\n",
+              "
\n" + ], + "text/plain": [ + "\u001b[1m[\u001b[0m\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mc02b93cdfbc7afa9d8475f20500e340d43e1dbbd0b8e17458817d886f5ff3d68\u001b[0m, content: \u001b[32m'The capital of Italy is\u001b[0m\n", + "\u001b[32mRome 0'\u001b[0m, score: \u001b[1;36m1.0\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m49679c985b4ab56bddebd77190836cfe772eb537121f87dc5ec67313931a3db0\u001b[0m, content: \u001b[32m'The capital of Italy is\u001b[0m\n", + "\u001b[32mRome 1'\u001b[0m, score: \u001b[1;36m1.0\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5\u001b[0m, content: \u001b[32m'The capital of Spain is\u001b[0m\n", + "\u001b[32mMadrid 2'\u001b[0m, score: \u001b[1;36m1.0\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35me44a482bfde5acf6636f9046f28634175dcd3f8021a7a7297e3aeb81f3aeef23\u001b[0m, content: \u001b[32m'The capital of France \u001b[0m\n", + "\u001b[32mis Paris 3'\u001b[0m, score: \u001b[1;36m1.0\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m3c14dad24490e08e8dad5b9ac53a7180181eb80f1656d31b08a3dc314a3983be\u001b[0m, content: \u001b[32m'The capital of Italy is\u001b[0m\n", + "\u001b[32mRome 4'\u001b[0m, score: \u001b[1;36m1.0\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m51aaaae5e07b07b3b53b07bf2cd159432ce0c4b2178a79f41999b6cb7c1f2af1\u001b[0m, content: \u001b[32m'The capital of Italy is\u001b[0m\n", + "\u001b[32mRome 5'\u001b[0m, score: \u001b[1;36m1.0\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m\n", + "\u001b[1m]\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore\n", + "\n", + "from haystack import Document\n", + "import numpy as np\n", + "import random\n", + "\n", + "contents = [\"The capital of Germany is Berlin\", \"The capital of France is Paris\", \"The capital of Spain is Madrid\", \"The capital of Italy is Rome\"]\n", + "\n", + "documents =[]\n", + "for i in range(1_000):\n", + " doc = Document(\n", + " content=random.choice(contents)+f\" {i}\",\n", + " embedding=np.random.rand(768).tolist()\n", + " )\n", + " documents.append(doc)\n", + "\n", + "\n", + "document_store = ElasticsearchDocumentStore(cloud_id=\"your Elastic Cloud ID\",\n", + " api_key=\"your Elastic API key\")\n", + "document_store.write_documents(documents)\n", + "\n", + "print(document_store.count_documents())\n", + "print(document_store.filter_documents()[:6])" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Grag46TJUdfo" + }, + "source": [ + "### Baseline: Pipeline with Retrievers running one after another" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 769 + }, + "id": "Qkax5zb4uEnG", + "outputId": "149bd5bc-7d56-4cf4-e56b-75f772b157df" + }, + "outputs": [ + { + "data": { + "text/html": [ + "
{\n",
+              "    'document_joiner': {\n",
+              "        'documents': [\n",
+              "            Document(id=76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5, content: 'The capital of \n",
+              "Spain is Madrid 2', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=81a55c519cd78fbe2eedf0d632cf2e256301ad42201b8be9b134eed7d5f8add6, content: 'The capital of \n",
+              "Spain is Madrid 10', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=ea817c00aee30178ebbfeb70f6e2a8853931c20ea47a1977227531ba175fc336, content: 'The capital of \n",
+              "Spain is Madrid 12', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=c43c8845ec4831d3321f94db7b256c86d1c4a4a3b077dd7acb0e826c65be7a20, content: 'The capital of \n",
+              "Spain is Madrid 13', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=b1da49017ec07aeed322e43c714e2f06e8048990af53c6b240c4a910f8588d75, content: 'The capital of \n",
+              "Spain is Madrid 14', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=e463016f308ca7446c7b1c2086fe0f5a7faf7fd23d1a036a1b4d7b384ca2ee0b, content: 'The capital of \n",
+              "Spain is Madrid 16', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=97d3185e7aee8f0c5fda8f10a4664ce9d41169d80cba822004016a7772df5a1f, content: 'The capital of \n",
+              "Spain is Madrid 18', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=5c41d72973050af64e932b8930982158cb2447bcba45d83c95f9b2a6bfa448f6, content: 'The capital of \n",
+              "Spain is Madrid 22', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=18b8a234a362a7f23ec85ca2490ca076f3a685ac2f54dcc51fc0dc08ce01ec48, content: 'The capital of \n",
+              "Spain is Madrid 28', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=894416844ee654aadc7b65a59e31d305ab2ff8f163d92cc8d55cc83087ab7cce, content: 'The capital of \n",
+              "Spain is Madrid 37', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=fb2394dc1f7ad21a80b143c423de21a4ce53ce785de1236331a28615dd3ba3e2, content: 'The capital of \n",
+              "Spain is Madrid 194', score: 0.47077772, embedding: vector of size 768),\n",
+              "            Document(id=f8de88083f17f5011d81b5ac47366c3a31bb2a00d11fd67d0e9244f8579401c8, content: 'The capital of \n",
+              "Italy is Rome 141', score: 0.47066435, embedding: vector of size 768),\n",
+              "            Document(id=a5b9809f86b2260ff622077d366ac2072d9d2f9c6603db62111578cbc53738c6, content: 'The capital of \n",
+              "Spain is Madrid 538', score: 0.470319925, embedding: vector of size 768),\n",
+              "            Document(id=b3331a607a68e5cdf00857b31d5e48ad2dd1b3d3d63a32748938d381e6540146, content: 'The capital of \n",
+              "Spain is Madrid 118', score: 0.470275225, embedding: vector of size 768),\n",
+              "            Document(id=3f510b7d9f4b8e8837396ecad9457ae169d38396d5385bff6eb6dd5fbcb3c75e, content: 'The capital of \n",
+              "Italy is Rome 603', score: 0.4701038, embedding: vector of size 768),\n",
+              "            Document(id=66118fdd45b5b2f3fff2aa13d28b706efe4d3d239d11d7a335485d1f02da2eaf, content: 'The capital of \n",
+              "Italy is Rome 612', score: 0.4700622, embedding: vector of size 768),\n",
+              "            Document(id=651529b836953ffd40a8620b2a34390fd627fbf3880ea740233a2f1974cfdda9, content: 'The capital of \n",
+              "Italy is Rome 489', score: 0.47003627, embedding: vector of size 768),\n",
+              "            Document(id=6713ff1da7fece0bf2b16526aedd7f4044cf75a9be4928a85fade9c2446cded2, content: 'The capital of \n",
+              "Germany is Berlin 735', score: 0.469990015, embedding: vector of size 768),\n",
+              "            Document(id=56cdccc03a89a1140bc3d58afb1fd945460e9ddb78858b893f96c3da9f3f6318, content: 'The capital of \n",
+              "France is Paris 726', score: 0.4698249, embedding: vector of size 768),\n",
+              "            Document(id=1bfa297e32fb9d38ec2ecc59409937b910ce3cc9dab110ca40964fe00ce5dd5f, content: 'The capital of \n",
+              "Spain is Madrid 977', score: 0.46978408, embedding: vector of size 768)\n",
+              "        ]\n",
+              "    }\n",
+              "}\n",
+              "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[32m'document_joiner'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'documents'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 2'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m81a55c519cd78fbe2eedf0d632cf2e256301ad42201b8be9b134eed7d5f8add6\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 10'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mea817c00aee30178ebbfeb70f6e2a8853931c20ea47a1977227531ba175fc336\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 12'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mc43c8845ec4831d3321f94db7b256c86d1c4a4a3b077dd7acb0e826c65be7a20\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 13'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mb1da49017ec07aeed322e43c714e2f06e8048990af53c6b240c4a910f8588d75\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 14'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35me463016f308ca7446c7b1c2086fe0f5a7faf7fd23d1a036a1b4d7b384ca2ee0b\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 16'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m97d3185e7aee8f0c5fda8f10a4664ce9d41169d80cba822004016a7772df5a1f\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 18'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m5c41d72973050af64e932b8930982158cb2447bcba45d83c95f9b2a6bfa448f6\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 22'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m18b8a234a362a7f23ec85ca2490ca076f3a685ac2f54dcc51fc0dc08ce01ec48\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 28'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m894416844ee654aadc7b65a59e31d305ab2ff8f163d92cc8d55cc83087ab7cce\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 37'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mfb2394dc1f7ad21a80b143c423de21a4ce53ce785de1236331a28615dd3ba3e2\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 194'\u001b[0m, score: \u001b[1;36m0.47077772\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mf8de88083f17f5011d81b5ac47366c3a31bb2a00d11fd67d0e9244f8579401c8\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 141'\u001b[0m, score: \u001b[1;36m0.47066435\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35ma5b9809f86b2260ff622077d366ac2072d9d2f9c6603db62111578cbc53738c6\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 538'\u001b[0m, score: \u001b[1;36m0.470319925\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mb3331a607a68e5cdf00857b31d5e48ad2dd1b3d3d63a32748938d381e6540146\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 118'\u001b[0m, score: \u001b[1;36m0.470275225\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m3f510b7d9f4b8e8837396ecad9457ae169d38396d5385bff6eb6dd5fbcb3c75e\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 603'\u001b[0m, score: \u001b[1;36m0.4701038\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m66118fdd45b5b2f3fff2aa13d28b706efe4d3d239d11d7a335485d1f02da2eaf\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 612'\u001b[0m, score: \u001b[1;36m0.4700622\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m651529b836953ffd40a8620b2a34390fd627fbf3880ea740233a2f1974cfdda9\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 489'\u001b[0m, score: \u001b[1;36m0.47003627\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m6713ff1da7fece0bf2b16526aedd7f4044cf75a9be4928a85fade9c2446cded2\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mGermany is Berlin 735'\u001b[0m, score: \u001b[1;36m0.469990015\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m56cdccc03a89a1140bc3d58afb1fd945460e9ddb78858b893f96c3da9f3f6318\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mFrance is Paris 726'\u001b[0m, score: \u001b[1;36m0.4698249\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m1bfa297e32fb9d38ec2ecc59409937b910ce3cc9dab110ca40964fe00ce5dd5f\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 977'\u001b[0m, score: \u001b[1;36m0.46978408\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
Time taken for 100 non concurrent calls: 21.74193024635315 seconds\n",
+              "
\n" + ], + "text/plain": [ + "Time taken for \u001b[1;36m100\u001b[0m non concurrent calls: \u001b[1;36m21.74193024635315\u001b[0m seconds\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchBM25Retriever, ElasticsearchEmbeddingRetriever\n", + "from haystack import Pipeline\n", + "from haystack.components.joiners import DocumentJoiner\n", + "\n", + "\n", + "\n", + "embedding_retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)\n", + "bm25_retriever = ElasticsearchBM25Retriever(document_store=document_store)\n", + "document_joiner = DocumentJoiner(join_mode=\"merge\")\n", + "\n", + "hybrid_retrieval = Pipeline()\n", + "hybrid_retrieval.add_component(\"embedding_retriever\", embedding_retriever)\n", + "hybrid_retrieval.add_component(\"bm25_retriever\", bm25_retriever)\n", + "hybrid_retrieval.add_component(\"document_joiner\", document_joiner)\n", + "\n", + "hybrid_retrieval.connect(\"bm25_retriever\", \"document_joiner\")\n", + "hybrid_retrieval.connect(\"embedding_retriever\", \"document_joiner\")\n", + "\n", + "query = \"Madrid\"\n", + "query_embedding = [0.1] * 768\n", + "\n", + "\n", + "start = time.time()\n", + "n = 100\n", + "for i in range(n):\n", + " result = hybrid_retrieval.run(\n", + " {\"bm25_retriever\": {\"query\": query}, \"embedding_retriever\": {\"query_embedding\": query_embedding}}\n", + " )\n", + "\n", + "end = time.time()\n", + "print(result)\n", + "print(f\"Time taken for {n} non concurrent calls: {end - start} seconds\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "MXpYIrJ6U5_e" + }, + "source": [ + "### Optimisation: Pipeline with Retrievers running concurrently\n", + "\n", + "We wrap multiple retrievers into a single component that internally will run them concurrently in different threads. To simplify the threads orchestration, we use the `to_thread` function from `asyncio`. The component itself will stay synchronous, exposing the usual `run` method." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "vDujbLZxVK2B" + }, + "source": [ + "#### ConcurrentRetrievers component\n", + "\n", + "- This component expects a list of retrievers and names as initialization parameters.\n", + "\n", + "- The `_arun` method is an async method, where concurrent execution takes place. It creates a thread for each retriever and waits for all threads to finish. Then the results are collected, aggregated, and returned.\n", + "\n", + "- The `run` method is synchronous (as expected by the Haystack pipeline) and calls the `_arun` method using `asyncio.run`.\n", + "\n", + "- Note: since different types of retrievers accept different query parameters in their `run` method (`query`, `query_embedding`, `sparse_query_embedding`), `inspect.signature` is used to determine if the retriever accepts a specific parameter." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": { + "id": "YoDnJCt4Exd_" + }, + "outputs": [], + "source": [ + "from haystack import Pipeline\n", + "from haystack.components.joiners import DocumentJoiner\n", + "from haystack.dataclasses import SparseEmbedding\n", + "from haystack import component, Document\n", + "from haystack.core.component import Component\n", + "from typing import List, Optional, Dict, Any\n", + "import asyncio\n", + "import inspect\n", + "\n", + "\n", + "@component\n", + "class ConcurrentRetrievers:\n", + " def __init__(self, retrievers: List[Component], names: Optional[List[str]] = None):\n", + " self.retrievers = retrievers\n", + " if names is None:\n", + " names = [f\"retriever_{i}\" for i in range(len(retrievers))]\n", + " self.names = names\n", + "\n", + " output_types = {k: List[Document] for k in names}\n", + " component.set_output_types(self, **output_types)\n", + "\n", + " async def _arun(self, **kwargs):\n", + " \"\"\"\n", + " Asynchrounous method to run the retrievers concurrently.\n", + " \"\"\"\n", + "\n", + " coroutines = []\n", + " for retriever in self.retrievers:\n", + " retriever_params = inspect.signature(getattr(retriever, \"run\")).parameters\n", + " selected_params = {\"top_k\": kwargs.get(\"top_k\"), \"filters\": kwargs.get(\"filters\")}\n", + " # each retriever accepts different parameters (keyword/BM25 retriever, embedding retriever, hybrid retriever)\n", + " for query_param in [\"query\", \"query_embedding\", \"sparse_query_embedding\"]:\n", + " if query_param in retriever_params:\n", + " selected_params[query_param] = kwargs.get(query_param)\n", + " # the retrievers run in separate threads\n", + " coroutines.append(asyncio.to_thread(retriever.run, **selected_params))\n", + "\n", + " results = await asyncio.gather(*coroutines)\n", + "\n", + " organized_results = {}\n", + " for retriever_name, res_ in zip(self.names, results):\n", + " organized_results[retriever_name] = res_[\"documents\"]\n", + " return organized_results\n", + "\n", + " def run(\n", + " self,\n", + " query: Optional[str] = None,\n", + " query_embedding: Optional[List[float]] = None,\n", + " sparse_query_embedding: Optional[SparseEmbedding] = None,\n", + " filters: Optional[Dict[str, Any]] = None,\n", + " top_k: Optional[int] = None,\n", + " ):\n", + " \"\"\"\n", + " Synchronous run method that can be integrated into a classic synchronous pipeline.\n", + " \"\"\"\n", + " results = asyncio.run(\n", + " self._arun(\n", + " query=query,\n", + " query_embedding=query_embedding,\n", + " sparse_query_embedding=sparse_query_embedding,\n", + " filters=filters,\n", + " top_k=top_k,\n", + " )\n", + " )\n", + " return results\n", + "\n", + "# serialization/deserialization methods can be implemented like\n", + "# in ConcurrentGenerators component..." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "Q0KgQbPyVUfY" + }, + "source": [ + "#### Pipeline with ConcurrentRetrievers component" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": { + "colab": { + "base_uri": "https://localhost:8080/", + "height": 769 + }, + "id": "RHRjgeJJGLtH", + "outputId": "6144cd2f-9039-417d-a8ae-0cea21426ed0" + }, + "outputs": [ + { + "data": { + "text/html": [ + "
{\n",
+              "    'document_joiner': {\n",
+              "        'documents': [\n",
+              "            Document(id=76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5, content: 'The capital of \n",
+              "Spain is Madrid 2', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=81a55c519cd78fbe2eedf0d632cf2e256301ad42201b8be9b134eed7d5f8add6, content: 'The capital of \n",
+              "Spain is Madrid 10', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=ea817c00aee30178ebbfeb70f6e2a8853931c20ea47a1977227531ba175fc336, content: 'The capital of \n",
+              "Spain is Madrid 12', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=c43c8845ec4831d3321f94db7b256c86d1c4a4a3b077dd7acb0e826c65be7a20, content: 'The capital of \n",
+              "Spain is Madrid 13', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=b1da49017ec07aeed322e43c714e2f06e8048990af53c6b240c4a910f8588d75, content: 'The capital of \n",
+              "Spain is Madrid 14', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=e463016f308ca7446c7b1c2086fe0f5a7faf7fd23d1a036a1b4d7b384ca2ee0b, content: 'The capital of \n",
+              "Spain is Madrid 16', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=97d3185e7aee8f0c5fda8f10a4664ce9d41169d80cba822004016a7772df5a1f, content: 'The capital of \n",
+              "Spain is Madrid 18', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=5c41d72973050af64e932b8930982158cb2447bcba45d83c95f9b2a6bfa448f6, content: 'The capital of \n",
+              "Spain is Madrid 22', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=18b8a234a362a7f23ec85ca2490ca076f3a685ac2f54dcc51fc0dc08ce01ec48, content: 'The capital of \n",
+              "Spain is Madrid 28', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=894416844ee654aadc7b65a59e31d305ab2ff8f163d92cc8d55cc83087ab7cce, content: 'The capital of \n",
+              "Spain is Madrid 37', score: 0.6654567, embedding: vector of size 768),\n",
+              "            Document(id=fb2394dc1f7ad21a80b143c423de21a4ce53ce785de1236331a28615dd3ba3e2, content: 'The capital of \n",
+              "Spain is Madrid 194', score: 0.47077772, embedding: vector of size 768),\n",
+              "            Document(id=f8de88083f17f5011d81b5ac47366c3a31bb2a00d11fd67d0e9244f8579401c8, content: 'The capital of \n",
+              "Italy is Rome 141', score: 0.47066435, embedding: vector of size 768),\n",
+              "            Document(id=a5b9809f86b2260ff622077d366ac2072d9d2f9c6603db62111578cbc53738c6, content: 'The capital of \n",
+              "Spain is Madrid 538', score: 0.470319925, embedding: vector of size 768),\n",
+              "            Document(id=b3331a607a68e5cdf00857b31d5e48ad2dd1b3d3d63a32748938d381e6540146, content: 'The capital of \n",
+              "Spain is Madrid 118', score: 0.470275225, embedding: vector of size 768),\n",
+              "            Document(id=3f510b7d9f4b8e8837396ecad9457ae169d38396d5385bff6eb6dd5fbcb3c75e, content: 'The capital of \n",
+              "Italy is Rome 603', score: 0.4701038, embedding: vector of size 768),\n",
+              "            Document(id=66118fdd45b5b2f3fff2aa13d28b706efe4d3d239d11d7a335485d1f02da2eaf, content: 'The capital of \n",
+              "Italy is Rome 612', score: 0.4700622, embedding: vector of size 768),\n",
+              "            Document(id=651529b836953ffd40a8620b2a34390fd627fbf3880ea740233a2f1974cfdda9, content: 'The capital of \n",
+              "Italy is Rome 489', score: 0.47003627, embedding: vector of size 768),\n",
+              "            Document(id=6713ff1da7fece0bf2b16526aedd7f4044cf75a9be4928a85fade9c2446cded2, content: 'The capital of \n",
+              "Germany is Berlin 735', score: 0.469990015, embedding: vector of size 768),\n",
+              "            Document(id=56cdccc03a89a1140bc3d58afb1fd945460e9ddb78858b893f96c3da9f3f6318, content: 'The capital of \n",
+              "France is Paris 726', score: 0.4698249, embedding: vector of size 768),\n",
+              "            Document(id=1bfa297e32fb9d38ec2ecc59409937b910ce3cc9dab110ca40964fe00ce5dd5f, content: 'The capital of \n",
+              "Spain is Madrid 977', score: 0.46978408, embedding: vector of size 768)\n",
+              "        ]\n",
+              "    }\n",
+              "}\n",
+              "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[32m'document_joiner'\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[32m'documents'\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m76f7ac287e02cf5972db85446efcc230b7a4f0d8988fc325ec15b1c4d8188eb5\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 2'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m81a55c519cd78fbe2eedf0d632cf2e256301ad42201b8be9b134eed7d5f8add6\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 10'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mea817c00aee30178ebbfeb70f6e2a8853931c20ea47a1977227531ba175fc336\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 12'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mc43c8845ec4831d3321f94db7b256c86d1c4a4a3b077dd7acb0e826c65be7a20\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 13'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mb1da49017ec07aeed322e43c714e2f06e8048990af53c6b240c4a910f8588d75\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 14'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35me463016f308ca7446c7b1c2086fe0f5a7faf7fd23d1a036a1b4d7b384ca2ee0b\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 16'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m97d3185e7aee8f0c5fda8f10a4664ce9d41169d80cba822004016a7772df5a1f\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 18'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m5c41d72973050af64e932b8930982158cb2447bcba45d83c95f9b2a6bfa448f6\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 22'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m18b8a234a362a7f23ec85ca2490ca076f3a685ac2f54dcc51fc0dc08ce01ec48\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 28'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m894416844ee654aadc7b65a59e31d305ab2ff8f163d92cc8d55cc83087ab7cce\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 37'\u001b[0m, score: \u001b[1;36m0.6654567\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mfb2394dc1f7ad21a80b143c423de21a4ce53ce785de1236331a28615dd3ba3e2\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 194'\u001b[0m, score: \u001b[1;36m0.47077772\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mf8de88083f17f5011d81b5ac47366c3a31bb2a00d11fd67d0e9244f8579401c8\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 141'\u001b[0m, score: \u001b[1;36m0.47066435\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35ma5b9809f86b2260ff622077d366ac2072d9d2f9c6603db62111578cbc53738c6\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 538'\u001b[0m, score: \u001b[1;36m0.470319925\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35mb3331a607a68e5cdf00857b31d5e48ad2dd1b3d3d63a32748938d381e6540146\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 118'\u001b[0m, score: \u001b[1;36m0.470275225\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m3f510b7d9f4b8e8837396ecad9457ae169d38396d5385bff6eb6dd5fbcb3c75e\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 603'\u001b[0m, score: \u001b[1;36m0.4701038\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m66118fdd45b5b2f3fff2aa13d28b706efe4d3d239d11d7a335485d1f02da2eaf\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 612'\u001b[0m, score: \u001b[1;36m0.4700622\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m651529b836953ffd40a8620b2a34390fd627fbf3880ea740233a2f1974cfdda9\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mItaly is Rome 489'\u001b[0m, score: \u001b[1;36m0.47003627\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m6713ff1da7fece0bf2b16526aedd7f4044cf75a9be4928a85fade9c2446cded2\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mGermany is Berlin 735'\u001b[0m, score: \u001b[1;36m0.469990015\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m56cdccc03a89a1140bc3d58afb1fd945460e9ddb78858b893f96c3da9f3f6318\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mFrance is Paris 726'\u001b[0m, score: \u001b[1;36m0.4698249\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m,\n", + " \u001b[1;35mDocument\u001b[0m\u001b[1m(\u001b[0m\u001b[33mid\u001b[0m=\u001b[35m1bfa297e32fb9d38ec2ecc59409937b910ce3cc9dab110ca40964fe00ce5dd5f\u001b[0m, content: \u001b[32m'The capital of \u001b[0m\n", + "\u001b[32mSpain is Madrid 977'\u001b[0m, score: \u001b[1;36m0.46978408\u001b[0m, embedding: vector of size \u001b[1;36m768\u001b[0m\u001b[1m)\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
Time taken for 100 concurrent calls: 11.726269483566284 seconds\n",
+              "
\n" + ], + "text/plain": [ + "Time taken for \u001b[1;36m100\u001b[0m concurrent calls: \u001b[1;36m11.726269483566284\u001b[0m seconds\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "embedding_retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)\n", + "bm25_retriever = ElasticsearchBM25Retriever(document_store=document_store)\n", + "document_joiner = DocumentJoiner(join_mode=\"merge\")\n", + "\n", + "hybrid_retrieval = Pipeline()\n", + "hybrid_retrieval.add_component(\n", + " \"concurrent_retrievers\",\n", + " ConcurrentRetrievers(\n", + " retrievers=[embedding_retriever, bm25_retriever],\n", + " names=[\"embedding_retriever\", \"bm25_retriever\"],\n", + " ),\n", + ")\n", + "hybrid_retrieval.add_component(\"document_joiner\", document_joiner)\n", + "\n", + "hybrid_retrieval.connect(\"concurrent_retrievers.embedding_retriever\", \"document_joiner\")\n", + "hybrid_retrieval.connect(\"concurrent_retrievers.bm25_retriever\", \"document_joiner\")\n", + "\n", + "query = \"Madrid\"\n", + "query_embedding = [0.1] * 768\n", + "\n", + "start = time.time()\n", + "n = 100\n", + "for i in range(n):\n", + " result = hybrid_retrieval.run(\n", + " {\"concurrent_retrievers\": {\"query\": query, \"query_embedding\": query_embedding}}\n", + " )\n", + "\n", + "end = time.time()\n", + "print(result)\n", + "print(f\"Time taken for {n} concurrent calls: {end - start} seconds\")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "ORRniqskV3n7" + }, + "source": [ + "## Conclusions\n", + "\n", + "- The proposed approach has proved effective for I/O bound tasks.\n", + "- A similar approach can be implemented using multiprocessing for CPU-bound tasks, such as local ML inference.\n", + "- When implementing components like these, special attention must be paid to input and output to make them truly usable in pipelines." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "id": "X8-UD2UVZqQY" + }, + "source": [ + "*Notebook by [Stefano Fiorucci](https://github.com/anakin87)*" + ] + } + ], + "metadata": { + "colab": { + "provenance": [] + }, + "kernelspec": { + "display_name": "Python 3", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}