Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE REQUEST]: Stream Results options #50

Open
jonnyjohnson1 opened this issue Nov 17, 2023 · 8 comments
Open

[FEATURE REQUEST]: Stream Results options #50

jonnyjohnson1 opened this issue Nov 17, 2023 · 8 comments
Labels
enhancement New feature or request

Comments

@jonnyjohnson1
Copy link

The problem you are trying to solve:

Streaming flow results eliminates dead time of users waiting around. I know it's not possible for intermediate flow steps, but the final one could be streamed.

Suggested new feature or change:

A streaming option would be nice.
In a flow, the next step would have to trigger after the step has been fully generated, but
the final step in a flow could be streamed.

Just have to add this parameter in the OpenAI and OpenAIChat class files.

openai.ChatCompletion.create(
model="gpt-3.5-turbo",
stream=True <--------- add this line
)

@jonnyjohnson1 jonnyjohnson1 added the enhancement New feature or request label Nov 17, 2023
@stoyan-stoyanov
Copy link
Owner

Thank you for submitting a feature request @jonnyjohnson1 ! What do you think the expected behaviour should be in cases where there are multiple final steps?

@jonnyjohnson1
Copy link
Author

My use case, right now, is to create a stream I can pass on to FastAPI's StreamResponse. I don't imagine a multiple-final-steps option. Though, I could see how it would be good to be able to pass along information (possibly through the callback functions) to notify when a new flow in the chain has started and stopped, and pass that information back through to the API. After exploration, I imagine the latter is possible given the callback functions, and I just need to know more about building an API and passing those results from the callback into the API.

The code isn't pretty atm, but I have added a stream parameter option to the OpenAIChat class, and ultimately allow a stream_callback within the functional callback class.

I have tried two options with the callback:

  1. I tried to return each token as it generates
  2. Return the generator itself that produces the results

Result:

  1. I can get each token returning in the callback function, but I can't pass that token on back to the API (Further reading seems to indicate I may be able achieve this by using a webhook- this type of thing is new to me, so needs further exploration.)
  2. When I return the generator, I get the generator object, but I am unable to iterate through the rest of it. (In my guessing, I think this has to do with concurrency. I haven't made this an async function.)

I figured out how to do this by changing a few files:

def _format_stream_results(
        self, model_outputs, retries, message_history
    ) -> tuple[str, dict, dict]:
        """
        Prepares results after generation.

        Args:
            model_outputs: Raw output after model generation.
            retries (int): Number of retries taken for successful generation.

        Returns:
            tuple(str, dict, dict): Formatted output text, raw outputs, and model
                configuration.
        """

        for chunk in model_outputs:
            text_result = chunk['choices'][0]['delta'].get("content", "")

            call_data = {
                "raw_outputs": model_outputs,
                "retries": retries,
            }

            model_config = {
                "model_name": self.model,
                "temperature": self.temperature,
                "max_tokens": self.max_tokens,
                "max_messages": message_history.max_messages,
                "messages": message_history.messages,
            }
            # print(text_result, call_data, model_config)
            yield text_result, call_data, model_config

It's used in the OpenAIChat class:

def generate(self, message_history: MessageHistory) -> tuple[str, dict, dict]:
        """
        Sends the messages to the OpenAI chat API and returns a chat message response.

        Returns:
            A tuple containing the generated chat message, raw output data, and model
                configuration.
        """

        completion, retries = call_with_retry(
            func=openai.ChatCompletion.create,
            exceptions_to_retry=(
                APIError,
                Timeout,
                RateLimitError,
                APIConnectionError,
                ServiceUnavailableError,
            ),
            max_retries=self.max_retries,
            model=self.model,
            stream=self.stream,
            messages=message_history.messages,
            max_tokens=self.max_tokens,
            temperature=self.temperature,
        )

        if self.stream:
            results_generator = self._format_stream_results(
                model_outputs=completion, retries=retries, message_history=message_history
            )
            for text_result, call_data, model_config in results_generator:
                yield text_result, call_data, model_config
        else:
            str_message, call_data, model_config = self._format_results(
                model_outputs=completion, retries=retries, message_history=message_history
            )
            return str_message, call_data, model_config

The functional_callback.py addition:

"""
This module provides FunctionalCallback class.

The FunctionalCallback class allows users to provide specific functions to be
executed at each stage of a FlowStep execution, without the need for subclassing.
"""

from typing import Callable, Optional, Any
from llmflows.callbacks.base_callback import BaseCallback

class FunctionalCallback(BaseCallback):
    """
    Represents a callback to be invoked at different stages of a FlowStep execution,
    with a specific function provided for each stage.

    Args:
        on_start_fn (Optional[Callable[[dict[str, Any]], None]]): The function to be
            invoked at the start stage.
        on_results_fn (Optional[Callable[[dict[str, Any]], None]]): The function to be
            invoked at the results stage.
        on_end_fn (Optional[Callable[[dict[str, Any]], None]]): The function to be
            invoked at the end stage.
        on_error_fn (Optional[Callable[[Exception], None]]): The function to be
            invoked in case of error.
    """
    def __init__(
        self,
        on_start_fn: Optional[Callable[[dict[str, Any]], None]] = None,
        on_stream_fn: Any = None,
        on_results_fn: Optional[Callable[[dict[str, Any]], None]] = None,
        on_end_fn: Optional[Callable[[dict[str, Any]], None]] = None,
        on_error_fn: Optional[Callable[[Exception], None]] = None
    ):
        self.on_start_fn = on_start_fn
        self.on_results_fn = on_results_fn
        self.on_end_fn = on_end_fn
        self.on_error_fn = on_error_fn
        self.on_stream_fn = on_stream_fn

    def on_start(self, inputs: dict[str, Any]):
        if self.on_start_fn is not None:
            self.on_start_fn(inputs)
    
    def on_stream(self, inputs: Any):
        if self.on_stream_fn is not None:
            self.on_stream_fn(inputs)

    def on_results(self, results: dict[str, Any]):
        if self.on_results_fn is not None:
            self.on_results_fn(results)

    def on_end(self, execution_info: dict[str, Any]):
        if self.on_end_fn is not None:
            self.on_end_fn(execution_info)

    def on_error(self, error: Exception):
        if self.on_error_fn is not None:
            self.on_error_fn(error)

Then, I use the llmflow, and callback in the main app like this:

logging_callback = FunctionalCallback(
      # on_start_fn=logging_on_start,
      on_results_fn=generator_response,
      on_stream_fn=generator_response
  )

tools_message_history = MessageHistory()
tools_message_history.system_prompt = flows_manager_prompt

flow_selector = ChatFlowStep(
      name="Flow Selector Step",
      llm=OpenAIChat(api_key=openai_api_key, stream=True, model="gpt-4"),
      message_history=tools_message_history,
      message_prompt_template= flows_react_prompt_template,
      message_key="question",
      output_key=f"flow_selector",
      stream = True,
      callbacks=[logging_callback],
  )
observation = FunctionalFlowStep(
    name="Observation",
    flowstep_fn=flows_selector,
    output_key="observation",
)

flow_selector.connect(observation)
flow = Flow(flow_selector)

for i in range(max_steps):
        flow_result = flow.start(
            question=message, react_history=react_history, verbose=False
        )

@stoyan-stoyanov
Copy link
Owner

Hey @jonnyjohnson1 thank you so much for sharing these! I will have some time to look into this over the weekend. I am currently working on a large update and I would like to use the opportunity to solve streaming as well

@stoyan-stoyanov
Copy link
Owner

stoyan-stoyanov commented Nov 26, 2023

Hey @jonnyjohnson1 I spent some time looking into this. I think a good solution would be to create a new .run_stream() method for flows in addition to the current .run() method. This can return a generator that yields any new chunk in any flowstep. With this solution one would be able to do something like

return StreamingResponse(stream_generator(flow.run_stream(question=message, react_history=react_history)

and get any chunk from any flowstep as they arrive.

Meanwhile, as you mentioned, an easier workaround can be updating the callback class and adding on_chunk / on_stream event that triggers a websocket message. Then the app will look like something along the lines of:

from fastapi import FastAPI, Request, WebSocket

...

async def generator_response():
    return await websocket.send_json({"agent_answer": "hi"})

logging_callback = AsyncFunctionalCallback(
      on_results_fn=generator_response,
      on_stream_fn=generator_response
)

...

flow_selector = AsyncChatFlowStep(
      name="Flow Selector Step",
      llm=OpenAIChat(api_key=openai_api_key, stream=True, model="gpt-4"),
      message_history=tools_message_history,
      message_prompt_template= flows_react_prompt_template,
      message_key="question",
      output_key=f"flow_selector",
      stream = True,
      callbacks=[logging_callback],
)

...

async def run_agent(question: str):
    final_result = flow.start()
    return final_result

@app.websocket("/ws/")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        question = data
        final_result = await run_agent(question)
        await websocket.send_json(result)

This of course makes sense only if you are fine with using websockets in your front-end.
I just made an example repo with a basic front-end that utilizes web sockets(this will probably evolve into a template repo):
https://github.com/stoyan-stoyanov/llmflows-ws-template

Would you like to contribute and take a stab at updating the callback function? If not I will probably have some extra time again next weekend.

@stoyan-stoyanov
Copy link
Owner

you can try the demo here:
https://llmflows-ws-template-96085e33adde.herokuapp.com/

@jonnyjohnson1
Copy link
Author

A .run_stream() option seems much cleaner.

In my flow, I used a Thought, Observation, Action chain, and I needed to know:

  1. When the generation was completed before submitting the text to the next step. Otherwise the prompt going into the next step was a single token, instead of the whole string.
  2. I wanted to return the stream only when the tokens came through the generation, so I had set up a 5-token cache to know when this response was generated. If phrase_detected, return stream.

I remember, too, I set up a 5-token cache in my case, to look for the tokens as they came through

The callback functions getting to update what stage the flow is on at each step of the way can really help the UI inform the user why they're waiting.

For each step in the async demo flow, the callback function choices might look like the following:

  1. creating movie title... (STATUS UPDATE)
  2. creating song title... (STATUS UPDATE)
  3. creating two main characters for {movie_title} (STATUS UPDATE)
  4. generating song lyrics for the new movie {movie_title} (STREAM VALUE)
  5. critiquing the movie's song lyrics (STREAM VALUE)

And, yeah, lemme see, I might be able to get to it Thursday or Friday.

@teomurgi
Copy link

teomurgi commented Apr 7, 2024

Any update on this?

@stoyan-stoyanov
Copy link
Owner

It's on my todo list. I am working on a few updates that I will release in the coming weekends and I will look into this afterwards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants