Skip to content
This repository has been archived by the owner on Apr 25, 2023. It is now read-only.
/ memphis.kt Public archive

Kotlin client for Memphis. Memphis is a Real-Time Data Processing Platform

Notifications You must be signed in to change notification settings

nemoengineering/memphis.kt

Repository files navigation

Memphis light logo

Memphis light logo

Simple as RabbitMQ, Robust as Apache Kafka, and Perfect for microservices.

Memphis UI

CNCF Silver Member

CNCF Silver Member

Sandbox - Docs - Twitter - YouTube

Discord Code Of Conduct GitHub release (latest by date)

Memphis{dev} is an open-source real-time data processing platform
that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker.
Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,
eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.

Installation

After installing and running memphis broker,
Add to the dependencies in your gradle file

implementation("TODO")

Importing

import dev.memphis.sdk.Memphis

Connecting to Memphis

val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>")

It is possible to pass connection configuration parameters.
val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>") {
    port = 6666
    autoReconnect = true
    maxReconnects = 3
    reconnectWait = 5.seconds
    connectionTimeout = 15.seconds
            
}

Once connected, all features offered by Memphis are available.

Disconnecting from Memphis

To disconnect from Memphis, call Close() on the Memphis connection object.

memphis.close()

Creating a Station

Stations can be created from Conn
Passing optional parameters
If a station already exists nothing happens, the new configuration will not be applied

val station = memphis.createStation("<station-name>")

val station = memphis.createStation("<station-name>") {
    retentionType = RetentionType.MAX_AGE_SECONDS
    retentionValue = 604800
    storageType = StorageType.DISK
    replicas = 1
    idempotencyWindow = 2.minutes
    schemaName = "<Schema Name>"
    sendPoisonMsgToDls = true
    sendSchemaFailedMsgToDls = true
}

Retention Types

Memphis currently supports the following types of retention:

RetentionType.MAX_AGE_SECONDS

The above means that every message persists for the value set in the retention value field (in seconds).

RetentionType.MESSAGES

The above means that after the maximum number of saved messages (set in retention value)
has been reached, the oldest messages will be deleted.

RetentionType.BYTES

The above means that after maximum number of saved bytes (set in retention value)
has been reached, the oldest messages will be deleted.

Storage Types

Memphis currently supports the following types of messages storage:

StorageType.DISK

The above means that messages persist on disk.

StorageType.MEMORY

The above means that messages persist on the main memory.

Destroying a Station

Destroying a station will remove all its resources (including producers and consumers).

station.Destroy()

Attaching a Schema to an Existing Station

memphis.attachSchema("<schema-name>", "<station-name>")

// Or from a station

station.attachSchema("<schema-name>")

Detaching a Schema from Station

memphis.detachSchema("<station-name>")

// Or from a station
station.detachSchema()

Produce and Consume Messages

The most common client operations are producing messages and consuming messages.

Messages are published to a station and consumed from it
by creating a consumer and consuming the resulting flow.
Consumers are pull-based and consume all the messages in a station
unless you are using a consumers group,
in which case messages are spread across all members in this group.

Memphis messages are payload agnostic. Payloads are ByteArray.

In order to stop receiving messages, you have to call consumer.stopConsuming().
The consumer will terminate regardless of whether there are messages in flight for the client.

Creating a Producer

val producer = memphis.producer("<station-name>", "<producer-name>") {
    genUniqueSuffix = false
}

Producing a message

producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
    ackWait = 15.seconds
    messageId = "<message Id>"
}

Add headers

producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
    headers.put("key", "value")
}

Async produce

Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss

producer.produceAsync("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>")

Message ID

Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id

producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
    messageId = "123"
}

Destroying a Producer

producer.destroy()

Creating a Consumer

val consumer = memphis.consumer("<station-name>", "<consumer-name>") {
    consumerGroup = "<consumer-group>"
    pullInterval = 1.seconds
    batchSize = 10
    batchMaxTimeToWait = 5.seconds
    maxAckTime = 30.seconds
    maxMsgDeliveries = 10
    genUniqueSuffix = false
}

Processing Messages

To consume messages you just need to collect the messages from the flow.

consumer.consume().collect {
    println("Received message:")
    println(it.data.toString(Charset.defaultCharset()))
    println(it.headers)
    println()
    it.ack()
}

If you need tighter control on when a new message should be fetched. subscribeMessages only fetches a message when an item in the flow is collected. It does not listen on DLS messages. You need to listen separately for DLS messages with subscribeDls

consumer.subscribeMessages().collect {
    println("Received message:")
    println(it.data.toString(Charset.defaultCharset()))
    println(it.headers)
    println()
    it.ack()
}

consumer.subscribeDls().collect {
    println("Received DLS message:")
    println(it.data.toString(Charset.defaultCharset()))
    println(it.headers)
    println()
    it.ack()
}

Acknowledging a Message

Acknowledging a message indicates to the Memphis server to not
re-send the same message again to the same consumer or consumers group.

message.ack()

Get headers

Get headers per message

val headers = msg.headers

Destroying a Consumer

consumer.destroy()

About

Kotlin client for Memphis. Memphis is a Real-Time Data Processing Platform

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages