Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutine question #1486

Closed
fenss opened this issue Mar 1, 2024 · 4 comments
Closed

Coroutine question #1486

fenss opened this issue Mar 1, 2024 · 4 comments
Assignees
Labels
question Further information is requested

Comments

@fenss
Copy link

fenss commented Mar 1, 2024

Hi langflow.

I noticed langflow provides the flow execution API: {langflow_host_and_port}/api/v1/process/{flow_id}

There's a param inputs, actually is the LangChain input param.

Then I create a simple flow, contains one normal LLMChain, one simple PromptTemplate, and a custom LLM. This LLM is simply wrapper of a HTTP request, using requests package in python.

I parallelly call this API: {langflow_host_and_port}/api/v1/process/{flow_id}, using inputs as a dict, found that it seems execute sequential, that means the requests block each others.

Follow the codes:

  1. API endpoints(https://github.com/logspace-ai/langflow/blob/v0.6.7a5/src/backend/langflow/api/v1/endpoints.py):
@router.post(
    "/predict/{flow_id}",
    response_model=ProcessResponse,
    dependencies=[Depends(api_key_security)],
)
@router.post(
    "/process/{flow_id}",
    response_model=ProcessResponse,
)
async def process(
    session: Annotated[Session, Depends(get_session)],
    flow_id: str,
    inputs: Optional[Union[List[dict], dict]] = None,
    tweaks: Optional[dict] = None,
    clear_cache: Annotated[bool, Body(embed=True)] = False,  # noqa: F821
    session_id: Annotated[Union[None, str], Body(embed=True)] = None,  # noqa: F821
    task_service: "TaskService" = Depends(get_task_service),
    api_key_user: User = Depends(api_key_security),
    sync: Annotated[bool, Body(embed=True)] = True,  # noqa: F821
    session_service: SessionService = Depends(get_session_service),
):
    """
    Endpoint to process an input with a given flow_id.
    """
  1. Get flow graph from storage, and ready to set the session(https://github.com/logspace-ai/langflow/blob/v0.6.7a5/src/backend/langflow/api/v1/endpoints.py):
async def process_graph_data(
    graph_data: dict,
    inputs: Optional[Union[List[dict], dict]] = None,
    tweaks: Optional[dict] = None,
    clear_cache: bool = False,
    session_id: Optional[str] = None,
    task_service: "TaskService" = Depends(get_task_service),
    sync: bool = True,
):
  1. Parse flow graph(https://github.com/logspace-ai/langflow/blob/v0.6.7a5/src/backend/langflow/processing/process.py):
async def process_graph_cached(
    data_graph: Dict[str, Any],
    inputs: Optional[Union[dict, List[dict]]] = None,
    clear_cache=False,
    session_id=None,
) -> Result:
  1. Start generating result(https://github.com/logspace-ai/langflow/blob/v0.6.7a5/src/backend/langflow/processing/process.py):
async def build_graph_and_generate_result(
    graph: "Graph",
    session_id: str,
    inputs: Optional[Union[dict, List[dict]]] = None,
    artifacts: Optional[Dict[str, Any]] = None,
    session_service: Optional[SessionService] = None,
):
  1. It wraps the input, and EXECUTE the Runnable instance in the flow graph(https://github.com/logspace-ai/langflow/blob/v0.6.7a5/src/backend/langflow/processing/process.py):
async def generate_result(built_object: Union[Chain, VectorStore, Runnable], inputs: Union[dict, List[dict]]):
    if isinstance(inputs, dict):
        result = await process_inputs_dict(built_object, inputs)
    elif isinstance(inputs, List) and isinstance(built_object, Runnable):
        result = await process_inputs_list(built_object, inputs)
    else:
        raise ValueError(f"Invalid inputs type: {type(inputs)}")

    if result is None:
        logger.warning(f"Unknown built_object type: {type(built_object)}")
        if isinstance(built_object, Coroutine):
            result = asyncio.run(built_object)
        result = built_object

    return result
  1. My inputs is a dict, here we go, we focus on output = langchain_object(inputs, return_only_outputs=True):
async def process_inputs_dict(built_object: Union[Chain, VectorStore, Runnable], inputs: dict):
    if isinstance(built_object, Chain):
        if inputs is None:
            raise ValueError("Inputs must be provided for a Chain")
        logger.debug("Generating result and thought")
        result = get_result_and_thought(built_object, inputs)

        logger.debug("Generated result and thought")

……
def get_result_and_thought(langchain_object: Any, inputs: dict):
    """Get result and thought from extracted json"""
……
        try:
            output = langchain_object(inputs, return_only_outputs=True)
……
  1. It's just a plain __call__ of Chain. If inputs is a dict wrapped in a list, It will call abatch or ainvoke, and the requests will not block each others:
async def process_inputs_list(built_object: Runnable, inputs: List[dict]):
    return await process_runnable(built_object, inputs)
……
async def process_runnable(runnable: Runnable, inputs: Union[dict, List[dict]]):
    if isinstance(inputs, List) and hasattr(runnable, "abatch"):
        result = await runnable.abatch(inputs)
    elif isinstance(inputs, dict) and hasattr(runnable, "ainvoke"):
        result = await runnable.ainvoke(inputs)

Is that necessary to output = langchain_object(inputs, return_only_outputs=True)?

Copy link

dosubot bot commented Mar 1, 2024

🤖

Hey @fenss, great to see you diving into the details again! How's everything going?

Thank you for your detailed analysis and question. You're correct in your understanding of how the inputs parameter is used in the process function and how it's passed down to the get_result_and_thought function.

The line output = langchain_object(inputs, return_only_outputs=True) is indeed executing the langchain_object with the provided inputs and returning only the outputs of the execution. This is done to ensure that only the final results of the processing are returned, excluding any intermediate steps or additional metadata that might be generated during the process.

As for your question about whether it's necessary to use output = langchain_object(inputs, return_only_outputs=True), it depends on the specific requirements of your application. If you need to process the inputs in a sequential manner and you want to ensure that the requests do not block each other, you might need to modify the way the inputs are processed.

However, please note that modifying the way inputs are processed could have implications on the overall behavior of the application. It's recommended to thoroughly test any changes to ensure they meet your requirements and do not introduce any unexpected behavior.

As for the issue of requests blocking each other, it's important to note that the process function is an asynchronous function, which means it's designed to handle multiple requests concurrently. However, the actual concurrency behavior might depend on the specific implementation of the langchain_object and how it handles the inputs.

If the langchain_object is implemented in a way that processes the inputs sequentially (i.e., it waits for one input to be processed before moving on to the next), then the requests could indeed block each other. If this is the case, you might need to modify the implementation of the langchain_object to process the inputs concurrently.

I hope this answers your question. If you have any further questions or need more clarification, please let me know.

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@dosubot dosubot bot added the question Further information is requested label Mar 1, 2024
@songjinu
Copy link

I also need the process for non blocking.
What's the reason of blocking process at #get_result_and_thought like the analysis of @fenss ?
How about this I modified like this,

       # result = get_result_and_thought(built_object, inputs)

        from fastapi.concurrency import run_in_threadpool
        result = await run_in_threadpool(get_result_and_thought, built_object, inputs)

@carlosrcoelho
Copy link
Contributor

Hi @fenss ,


We hope you're doing well. Just a friendly reminder that if we do not hear back from you within the next 3 days, we will close this issue. If you need more time or further assistance, please let us know.


Thank you for your understanding!

@carlosrcoelho carlosrcoelho self-assigned this Jul 22, 2024
@carlosrcoelho
Copy link
Contributor

Thank you for your contribution! This issue will be closed. If you have any questions or encounter another problem, please open a new issue and we will be ready to assist you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants