-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Question]: Build Async Document Summary Index ? #13762
Comments
Yes, you can process the document summary index concurrently using async to reduce the processing time. The provided code includes an asynchronous method Here is the relevant part of the code that demonstrates this: async def abuild_index_from_nodes(
self,
index_graph: IndexGraph,
cur_node_ids: Dict[int, str],
all_node_ids: Dict[int, str],
level: int = 0,
) -> IndexGraph:
"""Consolidates chunks recursively, in a bottoms-up fashion."""
if len(cur_node_ids) <= self.num_children:
index_graph.root_nodes = cur_node_ids
return index_graph
indices, cur_nodes_chunks, text_chunks = self._prepare_node_and_text_chunks(
cur_node_ids
)
with self._service_context.callback_manager.event(
CBEventType.TREE, payload={EventPayload.CHUNKS: text_chunks}
) as event:
text_chunks_progress = get_tqdm_iterable(
text_chunks,
show_progress=self._show_progress,
desc="Generating summaries",
)
tasks = [
self._service_context.llm.apredict(
self.summary_prompt, context_str=text_chunk
)
for text_chunk in text_chunks_progress
]
outputs: List[Tuple[str, str]] = await asyncio.gather(*tasks)
summaries = [output[0] for output in outputs]
self._service_context.llama_logger.add_log(
{"summaries": summaries, "level": level}
)
event.on_end(payload={"summaries": summaries, "level": level})
new_node_dict = self._construct_parent_nodes(
index_graph, indices, cur_nodes_chunks, summaries
)
all_node_ids.update(new_node_dict)
index_graph.root_nodes = new_node_dict
if len(new_node_dict) <= self.num_children:
return index_graph
else:
return await self.abuild_index_from_nodes(
index_graph, new_node_dict, all_node_ids, level=level + 1
) This method uses
|
I don't see any class that is using |
The from llama_index.legacy.indices.document_summary.base import DocumentSummaryIndex If you are looking for an alternative approach, you can use the Here is a brief overview of the from dataclasses import dataclass, field
from typing import Dict, List
from llama_index.core.data_structs.data_structs import IndexStruct
from llama_index.core.data_structs.struct_type import IndexStructType
from llama_index.core.schema import BaseNode
@dataclass
class IndexDocumentSummary(IndexStruct):
"""A simple struct containing a mapping from summary node_id to doc node_ids.
Also mapping vice versa.
"""
summary_id_to_node_ids: Dict[str, List[str]] = field(default_factory=dict)
node_id_to_summary_id: Dict[str, str] = field(default_factory=dict)
doc_id_to_summary_id: Dict[str, str] = field(default_factory=dict)
def add_summary_and_nodes(
self,
summary_node: BaseNode,
nodes: List[BaseNode],
) -> str:
"""Add node and summary."""
summary_id = summary_node.node_id
ref_doc_id = summary_node.ref_doc_id
if ref_doc_id is None:
raise ValueError(
"ref_doc_id of node cannot be None when building a document "
"summary index"
)
self.doc_id_to_summary_id[ref_doc_id] = summary_id
for node in nodes:
node_id = node.node_id
if summary_id not in self.summary_id_to_node_ids:
self.summary_id_to_node_ids[summary_id] = []
self.summary_id_to_node_ids[summary_id].append(node_id)
self.node_id_to_summary_id[node_id] = summary_id
return summary_id
@property
def summary_ids(self) -> List[str]:
"""Get summary ids."""
return list(self.summary_id_to_node_ids.keys())
def delete(self, doc_id: str) -> None:
"""Delete a document and its nodes."""
summary_id = self.doc_id_to_summary_id[doc_id]
del self.doc_id_to_summary_id[doc_id]
node_ids = self.summary_id_to_node_ids[summary_id]
for node_id in node_ids:
del self.node_id_to_summary_id[node_id]
del self.summary_id_to_node_ids[summary_id]
def delete_nodes(self, node_ids: List[str]) -> None:
for node_id in node_ids:
summary_id = self.node_id_to_summary_id[node_id]
self.summary_id_to_node_ids[summary_id].remove(node_id)
del self.node_id_to_summary_id[node_id]
@classmethod
def get_type(cls) -> IndexStructType:
"""Get type."""
return IndexStructType.DOCUMENT_SUMMARY This class allows you to create a summary for each document and manage the relationships between summaries and document nodes [1][2][3][4][5].
|
But how can we make documentsummaryindex concurrently and make it as query engine after that ? |
To make the
Here is the code to achieve this: from llama_index.core.schema import Document
from llama_index.core.service_context import ServiceContext
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.composability.joint_qa_summary import QASummaryQueryEngineBuilder
# Initialize the builder with optional custom contexts and texts
builder = QASummaryQueryEngineBuilder(
storage_context=StorageContext.from_defaults(),
service_context=ServiceContext.from_defaults(),
summary_text="Custom summary text",
qa_text="Custom QA text"
)
# Assume `documents` is a list of Document objects you want to index
documents = [...] # Replace with your actual documents
# Build the query engine from documents
query_engine = builder.build_from_documents(documents)
# Now `query_engine` can be used to perform queries Additionally, you can use the Here is an example of how to use from llama_index.core.indices.document_summary.retrievers import DocumentSummaryIndexLLMRetriever
from llama_index.core.indices.document_summary.base import DocumentSummaryIndex
from llama_index.core.llms.llm import LLM
# Assume `index` is an instance of DocumentSummaryIndex
index = DocumentSummaryIndex(...)
# Initialize the retriever
retriever = DocumentSummaryIndexLLMRetriever(
index=index,
llm=LLM(...)
)
# Perform a query
query_bundle = QueryBundle(query_str="Your query here")
results = retriever._retrieve(query_bundle) This code initializes the
|
@dosu is there a way to build this in async ? |
Yes, you can build the import asyncio
from llama_index.core.service_context import ServiceContext
from llama_index.core.storage.storage_context import StorageContext
from llama_index.core.composability.joint_qa_summary import QASummaryQueryEngineBuilder
async def async_build_from_documents(builder, documents):
return await builder.build_from_documents(documents)
# Usage
async def main():
builder = QASummaryQueryEngineBuilder(
storage_context=StorageContext.from_defaults(),
service_context=ServiceContext.from_defaults(),
summary_text="Custom summary text",
qa_text="Custom QA text"
)
documents = [...] # Your list of documents
query_engine = await async_build_from_documents(builder, documents)
print(query_engine)
# Run the async main function
asyncio.run(main()) In this example, the
|
Again coming back to original question. How can I make the following code to be in async, particularly for building doc_summary_index ?
|
To build
Here is the modified code: import asyncio
from llama_index.legacy.async_utils import run_async_tasks
from llama_index.legacy.indices.document_summary.base import DocumentSummaryIndex
from llama_index.legacy.response_synthesizers import get_response_synthesizer
# Define an asynchronous function to build the document summary index
async def async_build_doc_summary_index(documents, llm, transformations, response_synthesizer, show_progress):
doc_summary_index = DocumentSummaryIndex(
response_synthesizer=response_synthesizer,
show_progress=show_progress
)
# Assuming `build_doc_summary` is the coroutine function to build a single doc summary
async def build_doc_summary(doc):
# Your logic to build a summary for a single document
pass
tasks = [build_doc_summary(doc) for doc in documents]
await run_async_tasks(tasks, show_progress=show_progress, workers=4, desc="Building doc summary index")
return doc_summary_index
# Usage
async def main():
response_synthesizer = get_response_synthesizer(
response_mode="tree_summarize", use_async=True
)
city_docs = [...] # Your list of documents
splitter = ... # Your splitter transformation
chatgpt = ... # Your LLM instance
doc_summary_index = await async_build_doc_summary_index(
city_docs,
llm=chatgpt,
transformations=[splitter],
response_synthesizer=response_synthesizer,
show_progress=True
)
print(doc_summary_index)
# Run the async main function
asyncio.run(main()) In this example:
This approach leverages the asynchronous capabilities of the LlamaIndex framework to improve performance when building the
|
Question Validation
Question
Hi,
I am trying to build document summary index following this code
https://docs.llamaindex.ai/en/v0.10.17/examples/index_structs/doc_summary/DocSummary.html
However, it takes some time. Is there a way to process it concurrently using async?
Thanks
IG
The text was updated successfully, but these errors were encountered: