Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question]: Build Async Document Summary Index ? #13762

Open
1 task done
ihgumilar opened this issue May 28, 2024 · 9 comments
Open
1 task done

[Question]: Build Async Document Summary Index ? #13762

ihgumilar opened this issue May 28, 2024 · 9 comments
Labels
question Further information is requested

Comments

@ihgumilar
Copy link

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

Hi,

I am trying to build document summary index following this code

# default mode of building the index
response_synthesizer = get_response_synthesizer(
    response_mode="tree_summarize", use_async=True
)
doc_summary_index = DocumentSummaryIndex.from_documents(
    city_docs,
    llm=chatgpt,
    transformations=[splitter],
    response_synthesizer=response_synthesizer,
    show_progress=True,
)

https://docs.llamaindex.ai/en/v0.10.17/examples/index_structs/doc_summary/DocSummary.html

However, it takes some time. Is there a way to process it concurrently using async?

Thanks
IG

@ihgumilar ihgumilar added the question Further information is requested label May 28, 2024
Copy link

dosubot bot commented May 28, 2024

Yes, you can process the document summary index concurrently using async to reduce the processing time. The provided code includes an asynchronous method abuild_index_from_nodes that uses asyncio.gather to run tasks concurrently.

Here is the relevant part of the code that demonstrates this:

async def abuild_index_from_nodes(
    self,
    index_graph: IndexGraph,
    cur_node_ids: Dict[int, str],
    all_node_ids: Dict[int, str],
    level: int = 0,
) -> IndexGraph:
    """Consolidates chunks recursively, in a bottoms-up fashion."""
    if len(cur_node_ids) <= self.num_children:
        index_graph.root_nodes = cur_node_ids
        return index_graph

    indices, cur_nodes_chunks, text_chunks = self._prepare_node_and_text_chunks(
        cur_node_ids
    )

    with self._service_context.callback_manager.event(
        CBEventType.TREE, payload={EventPayload.CHUNKS: text_chunks}
    ) as event:
        text_chunks_progress = get_tqdm_iterable(
            text_chunks,
            show_progress=self._show_progress,
            desc="Generating summaries",
        )
        tasks = [
            self._service_context.llm.apredict(
                self.summary_prompt, context_str=text_chunk
            )
            for text_chunk in text_chunks_progress
        ]
        outputs: List[Tuple[str, str]] = await asyncio.gather(*tasks)
        summaries = [output[0] for output in outputs]
        self._service_context.llama_logger.add_log(
            {"summaries": summaries, "level": level}
        )

        event.on_end(payload={"summaries": summaries, "level": level})

    new_node_dict = self._construct_parent_nodes(
        index_graph, indices, cur_nodes_chunks, summaries
    )
    all_node_ids.update(new_node_dict)

    index_graph.root_nodes = new_node_dict

    if len(new_node_dict) <= self.num_children:
        return index_graph
    else:
        return await self.abuild_index_from_nodes(
            index_graph, new_node_dict, all_node_ids, level=level + 1
        )

This method uses asyncio.gather to run the summary generation tasks concurrently, which can help reduce the overall processing time [1].

Details

About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@ihgumilar
Copy link
Author

I don't see any class that is using DocumentSummaryIndex

Copy link

dosubot bot commented May 28, 2024

The DocumentSummaryIndex class is defined in the llama_index.legacy.indices.document_summary.base module. You can import it using the following statement:

from llama_index.legacy.indices.document_summary.base import DocumentSummaryIndex

If you are looking for an alternative approach, you can use the IndexDocumentSummary class, which is defined in the llama_index.core.data_structs.document_summary module. This class provides a data structure for managing document summaries and their associated nodes.

Here is a brief overview of the IndexDocumentSummary class:

from dataclasses import dataclass, field
from typing import Dict, List
from llama_index.core.data_structs.data_structs import IndexStruct
from llama_index.core.data_structs.struct_type import IndexStructType
from llama_index.core.schema import BaseNode

@dataclass
class IndexDocumentSummary(IndexStruct):
    """A simple struct containing a mapping from summary node_id to doc node_ids.

    Also mapping vice versa.
    """

    summary_id_to_node_ids: Dict[str, List[str]] = field(default_factory=dict)
    node_id_to_summary_id: Dict[str, str] = field(default_factory=dict)
    doc_id_to_summary_id: Dict[str, str] = field(default_factory=dict)

    def add_summary_and_nodes(
        self,
        summary_node: BaseNode,
        nodes: List[BaseNode],
    ) -> str:
        """Add node and summary."""
        summary_id = summary_node.node_id
        ref_doc_id = summary_node.ref_doc_id
        if ref_doc_id is None:
            raise ValueError(
                "ref_doc_id of node cannot be None when building a document "
                "summary index"
            )
        self.doc_id_to_summary_id[ref_doc_id] = summary_id

        for node in nodes:
            node_id = node.node_id
            if summary_id not in self.summary_id_to_node_ids:
                self.summary_id_to_node_ids[summary_id] = []
            self.summary_id_to_node_ids[summary_id].append(node_id)

            self.node_id_to_summary_id[node_id] = summary_id

        return summary_id

    @property
    def summary_ids(self) -> List[str]:
        """Get summary ids."""
        return list(self.summary_id_to_node_ids.keys())

    def delete(self, doc_id: str) -> None:
        """Delete a document and its nodes."""
        summary_id = self.doc_id_to_summary_id[doc_id]
        del self.doc_id_to_summary_id[doc_id]
        node_ids = self.summary_id_to_node_ids[summary_id]
        for node_id in node_ids:
            del self.node_id_to_summary_id[node_id]
        del self.summary_id_to_node_ids[summary_id]

    def delete_nodes(self, node_ids: List[str]) -> None:
        for node_id in node_ids:
            summary_id = self.node_id_to_summary_id[node_id]
            self.summary_id_to_node_ids[summary_id].remove(node_id)
            del self.node_id_to_summary_id[node_id]

    @classmethod
    def get_type(cls) -> IndexStructType:
        """Get type."""
        return IndexStructType.DOCUMENT_SUMMARY

This class allows you to create a summary for each document and manage the relationships between summaries and document nodes [1][2][3][4][5].

Details

About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@ihgumilar
Copy link
Author

But how can we make documentsummaryindex concurrently and make it as query engine after that ?

Copy link

dosubot bot commented May 28, 2024

To make the DocumentSummaryIndex concurrently and then use it as a query engine, you can follow the example provided in the QASummaryQueryEngineBuilder class. Here is a step-by-step guide based on the provided code:

  1. Initialize the Builder: Create an instance of QASummaryQueryEngineBuilder with the necessary contexts and texts.

  2. Build from Documents: Use the build_from_documents method to parse nodes, ingest them, and build the indices.

  3. Create Query Engines: Convert the indices to query engines.

  4. Return Router Query Engine: Combine the query engines into a RouterQueryEngine.

Here is the code to achieve this:

from llama_index.core.schema import Document
from llama_index.core.service_context import ServiceContext
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.composability.joint_qa_summary import QASummaryQueryEngineBuilder

# Initialize the builder with optional custom contexts and texts
builder = QASummaryQueryEngineBuilder(
    storage_context=StorageContext.from_defaults(),
    service_context=ServiceContext.from_defaults(),
    summary_text="Custom summary text",
    qa_text="Custom QA text"
)

# Assume `documents` is a list of Document objects you want to index
documents = [...]  # Replace with your actual documents

# Build the query engine from documents
query_engine = builder.build_from_documents(documents)

# Now `query_engine` can be used to perform queries

Additionally, you can use the DocumentSummaryIndexLLMRetriever or DocumentSummaryIndexEmbeddingRetriever classes to retrieve nodes from the DocumentSummaryIndex. These classes provide methods to retrieve relevant summaries using either LLM calls or embeddings.

Here is an example of how to use DocumentSummaryIndexLLMRetriever:

from llama_index.core.indices.document_summary.retrievers import DocumentSummaryIndexLLMRetriever
from llama_index.core.indices.document_summary.base import DocumentSummaryIndex
from llama_index.core.llms.llm import LLM

# Assume `index` is an instance of DocumentSummaryIndex
index = DocumentSummaryIndex(...)

# Initialize the retriever
retriever = DocumentSummaryIndexLLMRetriever(
    index=index,
    llm=LLM(...)
)

# Perform a query
query_bundle = QueryBundle(query_str="Your query here")
results = retriever._retrieve(query_bundle)

This code initializes the QASummaryQueryEngineBuilder, builds the indices concurrently from the provided documents, and returns a RouterQueryEngine that can be used for querying. Additionally, it demonstrates how to use the DocumentSummaryIndexLLMRetriever to retrieve nodes from the DocumentSummaryIndex [1][2][3].

Details

About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@ihgumilar
Copy link
Author

@dosu is there a way to build this in async ?
query_engine = builder.build_from_documents(documents)

Copy link

dosubot bot commented May 28, 2024

Yes, you can build the query_engine asynchronously using the QASummaryQueryEngineBuilder class. Here is an example of how to modify the build_from_documents method to run asynchronously:

import asyncio
from llama_index.core.service_context import ServiceContext
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.composability.joint_qa_summary import QASummaryQueryEngineBuilder

async def async_build_from_documents(builder, documents):
    return await builder.build_from_documents(documents)

# Usage
async def main():
    builder = QASummaryQueryEngineBuilder(
        storage_context=StorageContext.from_defaults(),
        service_context=ServiceContext.from_defaults(),
        summary_text="Custom summary text",
        qa_text="Custom QA text"
    )
    documents = [...]  # Your list of documents
    query_engine = await async_build_from_documents(builder, documents)
    print(query_engine)

# Run the async main function
asyncio.run(main())

In this example, the async_build_from_documents function wraps the build_from_documents method to run it asynchronously. The main function initializes the QASummaryQueryEngineBuilder, builds the query engine from the provided documents asynchronously, and prints the result. This approach leverages the asynchronous capabilities of the LlamaIndex framework to improve performance.

Details

About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@ihgumilar
Copy link
Author

Again coming back to original question. How can I make the following code to be in async, particularly for building doc_summary_index ?

# default mode of building the index
response_synthesizer = get_response_synthesizer(
    response_mode="tree_summarize", use_async=True
)
doc_summary_index = DocumentSummaryIndex.from_documents(
    city_docs,
    llm=chatgpt,
    transformations=[splitter],
    response_synthesizer=response_synthesizer,
    show_progress=True,
)

Copy link

dosubot bot commented May 28, 2024

To build doc_summary_index asynchronously, you can modify the code to use asynchronous methods and utilities provided by the LlamaIndex framework. Here is an example of how you can achieve this:

  1. Use run_async_tasks for concurrent processing: This function allows you to run a list of coroutines concurrently.

  2. Modify the DocumentSummaryIndex class to support asynchronous building: Ensure that the methods used for building the index are asynchronous.

Here is the modified code:

import asyncio
from llama_index.legacy.async_utils import run_async_tasks
from llama_index.legacy.indices.document_summary.base import DocumentSummaryIndex
from llama_index.legacy.response_synthesizers import get_response_synthesizer

# Define an asynchronous function to build the document summary index
async def async_build_doc_summary_index(documents, llm, transformations, response_synthesizer, show_progress):
    doc_summary_index = DocumentSummaryIndex(
        response_synthesizer=response_synthesizer,
        show_progress=show_progress
    )
    
    # Assuming `build_doc_summary` is the coroutine function to build a single doc summary
    async def build_doc_summary(doc):
        # Your logic to build a summary for a single document
        pass

    tasks = [build_doc_summary(doc) for doc in documents]
    await run_async_tasks(tasks, show_progress=show_progress, workers=4, desc="Building doc summary index")

    return doc_summary_index

# Usage
async def main():
    response_synthesizer = get_response_synthesizer(
        response_mode="tree_summarize", use_async=True
    )
    city_docs = [...]  # Your list of documents
    splitter = ...  # Your splitter transformation
    chatgpt = ...  # Your LLM instance

    doc_summary_index = await async_build_doc_summary_index(
        city_docs,
        llm=chatgpt,
        transformations=[splitter],
        response_synthesizer=response_synthesizer,
        show_progress=True
    )
    print(doc_summary_index)

# Run the async main function
asyncio.run(main())

In this example:

  • async_build_doc_summary_index is an asynchronous function that builds the document summary index.
  • run_async_tasks is used to run the summary building tasks concurrently.
  • The main function initializes the necessary components and builds the document summary index asynchronously.

This approach leverages the asynchronous capabilities of the LlamaIndex framework to improve performance when building the doc_summary_index [1][2].

Details

About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant