Skip to content

proxima-one/stream-client-go

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

65 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Golang Proxima.one StreamDB Client

This library is a Golang client for Proxima Stream Registry and Proxima StreamDB.

Stream Registry Client

Wraps all methods of the Proxima Streams API that is also available at https://streams.api.proxima.one.

streamRegistryClient := proximaclient.NewStreamRegistryClient(proximaclient.StreamRegistryClientOptions{
    Endpoint:        "https://streams.api.proxima.one",
    RetryPolicy:     connection.DefaultPolicy(),
    DebugHttpOutput: false,
})

Proxima Stream Client

client := proximaclient.NewStreamClient(proximaclient.Options{Registry: registry})

You can use either streamRegistryClient from previous example as a registry or create SingleStreamDbRegistry:

singleRegistryClient := proximaclient.NewSingleStreamDbRegistry("streams.buh.apps.proxima.one:443")

SingleRegistryClient is a simple implementation of the StreamRegistry interface that always returns the same stream db address. It can be useful for development purposes but in production you should use StreamRegistryClient that will fetch the StreamDB address from the registry.

As you have a client you can use it to consume a stream. There are some different methods to do so:

Streaming events

The second method is more suitable for long-running processes that need to consume a stream in a loop.

stream := client.StreamEvents(
    ctx,                           // stream context. When it is cancelled the stream will be closed
    "proxima.eth-main.blocks.1_0", // the name of the stream
    proximaclient.ZeroOffset(),
    1000,                          // stream buffer size. Consider increasing it if you have unstable network connection
)

Now stream is a Go channel with StreamEvent structs. You can use it in a loop:

for ctx.Err() == nil {
    select {
    case event := <-stream:
        // process event
    case <-ctx.Done():
        return
    }
}

Note that the StreamEvents will never throw any error. If there is a problem with the connection it will try to reconnect and continue streaming.

BufferedStreamReader

In some cases you may want to read events in batches in a long-running process. In this case you can use BufferedStreamReader:

reader := proximaclient.NewBufferedStreamReader(stream)
for i := 0; ; i++ {
    events := reader.TryRead(50)
    // process event
}

It's single TryRead method will read at least one event but no more than the specified number of events and return them as a slice.

If there are no events in the stream it will wait until there is at least one available. If a stream has more than one event, it will never wait for more events.

Fetching a number of events

It is useful when you want to fetch a number of events from the stream, but you shouldn't use it for long-running processes.

events, err := proximaclient.FetchEvents(
    "proxima.eth-main.blocks.1_0",       // the name of the stream
    proximaclient.ZeroOffset(),
    10,                                  // the MAX number of events to fetch
    proximaclient.DirectionNext, // direction can be either Next or Last which means forward or backward
)

You can now process events just like any other slice of StreamEvent structs.

Note that the FetchEvents method can return a non-nil error.