Skip to content

Consume Server-Sent Event (SSE) messages with HTTPX

License

Notifications You must be signed in to change notification settings

dbuades/httpx-sse

 
 

Repository files navigation

httpx-sse

Build Status Coverage Package version

Consume Server-Sent Event (SSE) messages with HTTPX.

Table of contents

Installation

NOTE: This is beta software. Please be sure to pin your dependencies.

pip install httpx-sse=="0.3.*"

Quickstart

httpx-sse provides the connect_sse and aconnect_sse helpers for connecting to an SSE endpoint. The resulting EventSource object exposes the .iter_sse() and .aiter_sse() methods to iterate over the server-sent events.

Example usage:

import httpx
from httpx_sse import connect_sse

with httpx.Client() as client:
    with connect_sse(client, "GET", "http:https://localhost:8000/sse") as event_source:
        for sse in event_source.iter_sse():
            print(sse.event, sse.data, sse.id, sse.retry)

You can try this against this example Starlette server (credit):

# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse

async def numbers(minimum, maximum):
    for i in range(minimum, maximum + 1):
        await asyncio.sleep(0.9)
        yield {"data": i}

async def sse(request):
    generator = numbers(1, 5)
    return EventSourceResponse(generator)

routes = [
    Route("/sse", endpoint=sse)
]

app = Starlette(routes=routes)

if __name__ == "__main__":
    uvicorn.run(app)

How-To

Calling into Python web apps

You can call into Python web apps with HTTPX and httpx-sse to test SSE endpoints directly.

Here's an example of calling into a Starlette ASGI app...

import asyncio

import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route

async def auth_events(request):
    async def events():
        yield {
            "event": "login",
            "data": '{"user_id": "4135"}',
        }

    return EventSourceResponse(events())

app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])

async def main():
    async with httpx.AsyncClient(app=app) as client:
        async with aconnect_sse(
            client, "GET", "http:https://localhost:8000/sse/auth/"
        ) as event_source:
            events = [sse async for sse in event_source.aiter_sse()]
            (sse,) = events
            assert sse.event == "login"
            assert sse.json() == {"user_id": "4135"}

asyncio.run(main())

Handling reconnections

(Advanced)

SSETransport and AsyncSSETransport don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an httpx.ReadError from iter_sse() (or aiter_sse()).

However, httpx-sse does allow implementing reconnection by using the Last-Event-ID and reconnection time (in milliseconds), exposed as sse.id and sse.retry respectively.

Here's how you might achieve this using stamina...

import time
from typing import Iterator

import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry

def iter_sse_retrying(client, method, url):
    last_event_id = ""
    reconnection_delay = 0.0

    # `stamina` will apply jitter and exponential backoff on top of
    # the `retry` reconnection delay sent by the server.
    @retry(on=httpx.ReadError)
    def _iter_sse():
        nonlocal last_event_id, reconnection_delay

        time.sleep(reconnection_delay)

        headers = {"Accept": "text/event-stream"}

        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        with connect_sse(client, method, url, headers=headers) as event_source:
            for sse in event_source.iter_sse():
                last_event_id = sse.id

                if sse.retry is not None:
                    reconnection_delay = sse.retry / 1000

                yield sse

    return _iter_sse()

Usage:

with httpx.Client() as client:
    for sse in iter_sse_retrying(client, "GET", "http:https://localhost:8000/sse"):
        print(sse.event, sse.data)

API Reference

connect_sse

def connect_sse(
    client: httpx.Client,
    method: str,
    url: Union[str, httpx.URL],
    **kwargs,
) -> ContextManager[EventSource]

Connect to an SSE endpoint and return an EventSource context manager.

This sets Cache-Control: no-store on the request, as per the SSE spec, as well as Accept: text/event-stream.

If the response Content-Type is not text/event-stream, this will raise an SSEError.

aconnect_sse

async def aconnect_sse(
    client: httpx.AsyncClient,
    method: str,
    url: Union[str, httpx.URL],
    **kwargs,
) -> AsyncContextManager[EventSource]

An async equivalent to connect_sse.

EventSource

def __init__(response: httpx.Response)

Helper for working with an SSE response.

response

The underlying httpx.Response.

iter_sse

def iter_sse() -> Iterator[ServerSentEvent]

Decode the response content and yield corresponding ServerSentEvent.

Example usage:

for sse in event_source.iter_sse():
    ...

aiter_sse

async def iter_sse() -> AsyncIterator[ServerSentEvent]

An async equivalent to iter_sse.

ServerSentEvent

Represents a server-sent event.

  • event: str - Defaults to "message".
  • data: str - Defaults to "".
  • id: str - Defaults to "".
  • retry: str | None - Defaults to None.

Methods:

  • json() -> Any - Returns sse.data decoded as JSON.

SSEError

An error that occurred while making a request to an SSE endpoint.

Parents:

  • httpx.TransportError

License

MIT

About

Consume Server-Sent Event (SSE) messages with HTTPX

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Languages

  • Python 97.2%
  • Makefile 2.8%