Skip to content

mohammadsamir/kafka-rust

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

95 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka Rust Client

Build Status

Documentation

Kafka Rust Client Documentation

The documentation includes some examples too.

Installation

This crate works with Cargo and is on crates.io. I will be updating the package frequently till we move out of pre-release. So add this to your Cargo.toml (instead of a specific version):

[dependencies]
kafka = "*"

Usage:

Load topic metadata:

[Load Metadata] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.load_metadata_all)

extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    // OR
    // client.load_metadata(vec!("my-topic".to_owned())); // Loads metadata for vector of topics
 }
Fetch Offsets:

[For one topic] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_topic_offset)

extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let offsets = client.fetch_topic_offset("my-topic".to_owned(), -1);
}

[For multiple topics] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_offsets)

extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
    let mut client = KafkaClient::new(&vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let topics = client.topic_partitions.keys().cloned().collect();
    let offsets = client.fetch_offsets(topics, -1);
}
Produce:

[Single Message] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.send_message)

extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    client.send_message(1, 100, "my-topic".to_owned(), "msg".to_owned().into_bytes())
}

[Multiple Messages] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.send_messages)

extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let m1 = "a".to_owned().into_bytes();
    let m2 = "b".to_owned().into_bytes();
    let req = vec!(utils::ProduceMessage{topic: "my-topic".to_owned(), message: m1},
                    utils::ProduceMessage{topic: "my-topic-2".to_owned(), message: m2});
    client.send_messages(1, 100, req);  // required acks, timeout, messages
}
Fetch Messages:

[Single (topic, partition, offset)] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_messages)

extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
    let mut client = KafkaClient::new(&vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    // Topic, Partition, Offset
    let msgs = client.fetch_messages("my-topic".to_owned(), 0, 0);
}

[Multiple (topic, partition, offset)] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_messages_multi)

extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let msgs = client.fetch_messages_multi(vec!(utils::TopicPartitionOffset{
                                                    topic: "my-topic".to_owned(),
                                                    partition: 0,
                                                    offset: 0
                                                    },
                                                utils::TopicPartitionOffset{
                                                    topic: "my-topic-2".to_owned(),
                                                    partition: 0,
                                                    offset: 0
                                                })));
}
Commit Offsets to a Consumer Group:

[Single (group, topic, partition, offset)] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.commit_offset)

extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    // Group, Topic, Partition, Offset
    let resp = client.commit_offset("my-group".to_owned(), "my-topic".to_owned(), 0, 100);
}

[Single group, Multiple (topic, partition, offset)] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.commit_offsets)

extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let msgs = client.commit_offsets("my-group".to_owned(), vec!(utils::TopicPartitionOffset{
                                                    topic: "my-topic".to_owned(),
                                                    partition: 0,
                                                    offset: 0
                                                    },
                                                utils::TopicPartitionOffset{
                                                    topic: "my-topic-2".to_owned(),
                                                    partition: 0,
                                                    offset: 0
                                                })));
}
Fetch Offsets of a Consumer Group:

[Offsets for all topics/partitions in a group] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_group_offset)

extern crate kafka;
use kafka::client::KafkaClient;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    // Group, Topic, Partition, Offset
    let resp = client.fetch_group_offset("my-group".to_owned());
}

[Offsets for a topic and all its partitions in a group] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_group_topic_offset)

extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let msgs = client.fetch_group_topic_offset("my-group".to_owned(), "my-topic".to_owned());
}

[Offsets for Multiple (topic, partition) in a group] (https://fauzism.co/rustdoc/kafka/client/struct.KafkaClient.html#method.fetch_group_topics_offset)

extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let msgs = client.fetch_group_topics_offset("my-group".to_owned(), vec!(utils::TopicPartition{
                                                    topic: "my-topic".to_owned(),
                                                    partition: 0
                                                    },
                                                utils::TopicPartition{
                                                    topic: "my-topic-2".to_owned(),
                                                    partition: 0
                                                })));
}

This is a simple Consumer for kafka. It handles offset management internally (Fetching offset for the group at the start and committing offsets at a pre-defined interval - 100 consumed messages currently) and provides an Iterator interface.

extern crate kafka;
use kafka::client::KafkaClient;
use kafka::utils;
fn main() {
    let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
    client.load_metadata_all();
    let con = kafka::consumer::Consumer::new(client, "test-group".to_owned(), "my-topic".to_owned())
             .partition(0);
    for msg in con {
        println!("{:?}", msg);
    }
}

TODO:

  • Tests - (Added tests for gzip.rs, snappy.rs, and codecs.rs)

About

Rust client for Apache Kafka

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Rust 100.0%