riemann.kafka
Receives events from and forwards events to Kafka.
json-deserializer
(json-deserializer)
Deserialize JSON. Let bad payload not break the consumption.
kafka
(kafka)
(kafka opts)
Returns a function that is invoked with a topic name and an optional message key and returns a stream. That stream is a function which takes an event or a sequence of events and sends them to Kafka.
(def kafka-output (kafka))
(changed :state
(kafka-output "mytopic"))
Options:
For a complete list of producer configuration options see https://kafka.apache.org/documentation/#producerconfigs
- :bootstrap.servers Bootstrap configuration, default is “localhost:9092”.
- :value.serializer Value serializer, default is json-serializer.
Example with SSL enabled:
(def kafka-output (kafka {:bootstrap.servers "kafka.example.com:9092"
:security.protocol "SSL"
:ssl.truststore.location "/path/to/my/truststore.jks"
:ssl.truststore.password "mypassword"}))
start-kafka-thread
(start-kafka-thread running? core opts)
Start a kafka thread which will pop messages off the queue as long as running? is true