Sandbox - Docs - Twitter - YouTube
Memphis{dev} is an open-source real-time data processing platform
that provides end-to-end support for in-app streaming use cases using Memphis distributed message broker.
Memphis' platform requires zero ops, enables rapid development, extreme cost reduction,
eliminates coding barriers, and saves a great amount of dev time for data-oriented developers and data engineers.
After installing and running memphis broker,
Add to the dependencies in your gradle file
implementation("TODO")
import dev.memphis.sdk.Memphis
val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>")
It is possible to pass connection configuration parameters.
val memphis = Memphis.connect("<memphis-host>", "<application type username>", "<broker-token>") {
port = 6666
autoReconnect = true
maxReconnects = 3
reconnectWait = 5.seconds
connectionTimeout = 15.seconds
}
Once connected, all features offered by Memphis are available.
To disconnect from Memphis, call Close() on the Memphis connection object.
memphis.close()
Stations can be created from Conn
Passing optional parameters
If a station already exists nothing happens, the new configuration will not be applied
val station = memphis.createStation("<station-name>")
val station = memphis.createStation("<station-name>") {
retentionType = RetentionType.MAX_AGE_SECONDS
retentionValue = 604800
storageType = StorageType.DISK
replicas = 1
idempotencyWindow = 2.minutes
schemaName = "<Schema Name>"
sendPoisonMsgToDls = true
sendSchemaFailedMsgToDls = true
}
Memphis currently supports the following types of retention:
RetentionType.MAX_AGE_SECONDS
The above means that every message persists for the value set in the retention value field (in seconds).
RetentionType.MESSAGES
The above means that after the maximum number of saved messages (set in retention value)
has been reached, the oldest messages will be deleted.
RetentionType.BYTES
The above means that after maximum number of saved bytes (set in retention value)
has been reached, the oldest messages will be deleted.
Memphis currently supports the following types of messages storage:
StorageType.DISK
The above means that messages persist on disk.
StorageType.MEMORY
The above means that messages persist on the main memory.
Destroying a station will remove all its resources (including producers and consumers).
station.Destroy()
memphis.attachSchema("<schema-name>", "<station-name>")
// Or from a station
station.attachSchema("<schema-name>")
memphis.detachSchema("<station-name>")
// Or from a station
station.detachSchema()
The most common client operations are producing messages and consuming messages.
Messages are published to a station and consumed from it
by creating a consumer and consuming the resulting flow.
Consumers are pull-based and consume all the messages in a station
unless you are using a consumers group,
in which case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are ByteArray
.
In order to stop receiving messages, you have to call consumer.stopConsuming()
.
The consumer will terminate regardless of whether there are messages in flight for the client.
val producer = memphis.producer("<station-name>", "<producer-name>") {
genUniqueSuffix = false
}
producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
ackWait = 15.seconds
messageId = "<message Id>"
}
producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
headers.put("key", "value")
}
Meaning your application won't wait for broker acknowledgement - use only in case you are tolerant for data loss
producer.produceAsync("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>")
Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id
producer.produce("<message in ByteArray or (schema validated station - protobuf) or ByteArray(schema validated station - json schema) or ByteArray (schema validated station - graphql schema)>") {
messageId = "123"
}
producer.destroy()
val consumer = memphis.consumer("<station-name>", "<consumer-name>") {
consumerGroup = "<consumer-group>"
pullInterval = 1.seconds
batchSize = 10
batchMaxTimeToWait = 5.seconds
maxAckTime = 30.seconds
maxMsgDeliveries = 10
genUniqueSuffix = false
}
To consume messages you just need to collect the messages from the flow.
consumer.consume().collect {
println("Received message:")
println(it.data.toString(Charset.defaultCharset()))
println(it.headers)
println()
it.ack()
}
If you need tighter control on when a new message should be fetched. subscribeMessages
only fetches a message when an item in the flow is collected.
It does not listen on DLS messages. You need to listen separately for DLS messages with subscribeDls
consumer.subscribeMessages().collect {
println("Received message:")
println(it.data.toString(Charset.defaultCharset()))
println(it.headers)
println()
it.ack()
}
consumer.subscribeDls().collect {
println("Received DLS message:")
println(it.data.toString(Charset.defaultCharset()))
println(it.headers)
println()
it.ack()
}
Acknowledging a message indicates to the Memphis server to not
re-send the same message again to the same consumer or consumers group.
message.ack()
Get headers per message
val headers = msg.headers
consumer.destroy()