Skip to content

A library that aims to abstract away and standardize the initial setup and implementation of connecting with message brokers

Notifications You must be signed in to change notification settings

oslabs-beta/Codename-Hermes

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Codename Hermes Logo

Codename Hermes

A library to abstract away and standardize the initial setup and implementation of connecting with message brokers.





Standards

Currently, these standards are in alpha and are subject to change as a result.

Our primary goal is to uniformize the implementation details of message brokers in Node.JS with the ultimate goal of expanding into other runtimes/languages in the future.




Initilization

Initilization of message brokers will follow the basic format shown below.

Broker(clientOptions: GenericClientOptions, topics: GenericTopic<any>);

Currently, each broker will have it's own specific clientOptions, but will always contain host and port keys.

{
  host: string;
  port?: number;
}

Additionally, each broker will have specific options for their topics. Though, they will always be in the format found below.

{
  [topicName: string]: brokerSpecificOptions;
}



Producing Events

Producing will be fairly straight forward for each message broker and will always follow the basic format shown below.

send(topicName: string, message: string)

We haven't implemented support for messaging multiple topics at once as of yet.




Consuming Events

Consuming will be more in-depth than producing, though just as simple.

The first things first, we need to setup a listener for each topic we want to consume. This can be achieved through the basic syntax below.

listen(topics: string[], options: GenericListenerOptions)

GenericListenerOptions will always look like:

{
  autoCommit?: boolean;
}

Creating a listener will then allow us to consume messages on that topic. This can be done via the basic syntax below.

consume(topicName: string, callback: MessageCallback)

A MessageCallback will always be based off the example below, although it may change from broker to broker.

(data: GenericMessage<any>, error?: any) => void

Data will always look like:

{
  topic: string;
  message: string;
}

A more complete example is as follows:

const topics = {
  topic1: {},
  topic2: {},
}

const broker = await Broker({ host: 'localhost' }, topics);

broker.listen(['topic2'], { autoCommit: true });

broker.consume('topic2', (data, error) => console.log(data.message));

broker.send('topic1', 'Hello World!');



Kafka

Following the same standards as we've gone over previously; Our Kafka implementation will hopefully seem fairly familiar.




Initilization

Much like you've seen, initializing Kafka will be the exact same as defined in the standards with the only differance being, we're assigning the returned value of the Kafka factory function to a variable.

const kafka = Kafka(clientOptions, topics);
const kafka = Kafka(clientOptions, topics, callback);
const kafka = Kafka(clientOptions, topics, producerOptions);
const kafka = Kafka(clientOptions, topics, producerOptions, callback);

For now, another discrepancy would be Kafka specific clientOptions and topics.

Speaking of, we have some new options!


clientOptions

{
  host: string;
  port?: number;
  connectTimeout?: number;
  requestTimeout?: number;
  autoConnect?: boolean;
  connectRetryOptions?: RetryOptions;
  idleConnection?: number;
  reconnectOnIdle?: boolean;
  maxAsyncRequests?: number;
}

But what do they do? Let's get into that.


host - Default: localhost

The Kafka message broker hostname or IP address.


port - Default: 9092

The port your Kafka message broker is on.


connectTimeout - Default: 10,000ms

How long, in ms, it takes to wait for a successful connection.


requestTimeout - Default: 30,000ms

How long, in ms, for a kafka request to timeout.


autoConnect - Default: true

Should it automatically connect when Kafka is instantiated?


connectRetryOptions -

An object hash that applies to the initial connection to customize connection retries.

{
  retries?: number; /* (The maximum amount of times to retry the operation.) */
  factor?: number; /* (The exponential factor to use.) */
  minTimeout?: number; /* (The number of milliseconds before starting the first retry. Default is 1000.) */
  maxTimeout?: number; /* (The maximum number of milliseconds between two retries. Default is Infinity.) */
  randomize?: boolean; /* (Randomizes the timeouts by multiplying with a factor between 1 to 2. Default is false.) */
}

For more information about connectRetryOptions, please visit here.


ildeConnection - Default: 5 minutes

Allows the broker to disconnect an idle connection from a client.

The value is elapsed time in ms without any data written to the TCP socket.


reconnectOnIdle - Default: true

When the connection is closed due to client idling, will the client attempt to auto-reconnect?


maxAsyncRequests - Default: 10

The maximum async operations at a time toward the kafka cluster.




topics

Now that we've covered our Kafka specific clientOptions, let's get into our topic structure.

As previously mentioned, we follow this format:

{
  [topicName: string]: KafkaTopic;
}

What does that entail?

Simply put, we're defining our topics as an object. This way we can have access to all of the information in our topics later on.

An example of how a topic will look in your code is as follows:

const topics = {
  topic1: null,

  topic2: {
    partition: 2,
  },

  topic3: {
    offset: 2,
  },

  topic4: {
    partition: 2,
    offset: 2,
  },
}

Now, why do we have null? It's because we're using null to say "use the default options for this topic".

The default options for a topic is:

{
  partition: 0,
  offset: 0,
}

Alright, so you might be asking yourself: "What does the partition and offset do?"

partition is just telling the Kafka broker "hey, this topic has X amount of sub-sections" and offset is saying "hey, start each partition at the offset X."




producerOptions

Producer options are a way for you to customize how you want to produce, or "send," messages to topics.

{
  requireAcks?: number;
  ackTimeoutMs?: number;
  partitionerType?: number;
}

As usual, let's break down what everything does.

requireAcks - Default: 1

The amount of acknowledgments the leader has to have received before considering a request complete.


ackTimeoutMs - Default: 100ms

How long, in ms, to wait before considering acknowledgments.


partitionerType - Default: 2

The algorithm that determines which partition to send messages to.

default = 0

random = 1

cyclic = 2

keyed = 3

Custom partitioners are currently in progess.


For more information regarding these producerOptions, please refer to this.




callback

This callback is invoked when we successfully connect to the broker as a producer. It will also be invoked if there are any errors.

If there is no error, the argument passed in will be null.

(error: any | null) => void;



Example

Now that we have the background knowledge of what each argument is, let's see an example of how it would look in your code.

const clientOptions = {
  host: 'localhost',
  port: 9092,
}

const topics = {
  topic1: null,
  topic2: null,
}

const kafka = Kafka(clientOptions, topics);



Producing

Sending messages to topics has been made simple and straight forward.

The only information you need are the name of the topic and the message you want to send.

Though, we do have options to specify where exactly you want the message to go as well as a callback to perform actions after the message has sent or if it errors out.

For now, we only support strings as the message.

kafka.send(topicName, message);
kafka.send(topicName, message, options);
kafka.send(topicName, message, callback);
kafka.send(topicName, message, options, callback);

options

Send options are a way to specify where you want your message to be delivered.

{
  key?: string;
  partition?: number;
  attributes?: number;
}

key - Default: ''

The key for the message.

It's kind of like specifying the person you're sending a letter to.


partition - Default: 0

What partition, or sub-section, you want to send the message to.


attributes - Default: 0

Specifies whether or not you want to use compression for your message.

No compression = 0

GZip = 1

snappy = 2


callback

This callback will be invoked when the message successfully send and if there is an error.

If there was an error, that error will be passed as an argument. Otherwise it will be null.

(error: any | null) => void;



Consuming

Compared to sending messages to topics, consuming them is a little more involve but still straight forward.

As you can see below, you can provide listener options. Although, if you want to just use the defaults that's valid too.

kafka.consume(topics, callback)
kafka.consume(topics, listenerOptions, callback)

First, let's overview what you can expect in the callback.

callback - Required

(message: KafkaMessage | null, error?: any) => void

A KafkaMessage contains the following:

{
  topic: string;
  message: string;
  offset?: number;
  partition?: number;
  highWaterOffset?: number;
  key?: string;
}

Let's go a bit more in-depth about what each key does.

topic

The name of the topic a message was consumed from.


message

The message sent to the topic.


offset

The offset of the message.

Think of appending to an array.


partition

The partition, or sub-section, on the topic a message was sent to.


highWaterOffset

The last offset that was successfully replicated to all topics.


key

The identifier for a message.

For example: if you have a gateway topic and you need user data from the auth topic, you can specify a key to differenciate between other messages sent to the gateway topic.


Now, the message may be null. This will only occur when there is an error.

listenerOptions

Finally, we can dive into what the listener options are and what they do.

We provide a way to customize how you want listeners to behave. If you have no need for customization, there are also default options to fallback on.

KafkaListenerOptions

{
  autoCommit?: boolean;
  groupId?: string;
  autoCommitIntervalMs?: number;
  fetchMaxWaitMs?: number;
  fetchMinBytes?: number;
  fetchMaxBytes?: number;
  fromOffset?: boolean;
}

autoCommit - Default: false

autoCommit will automagically mark the consumed messages as "read"


groupId - Default: 'kafka-node-group'

The consumer gorup id.


autoCommitIntervalMs - Default: 5000ms

How often, in ms, will the consumer mark messages as "sent"


fetchMaxWaitMs - Default: 100ms

The max time, in ms, to wait if there is insufficient data when receiving a message.


fetchMinBytes - Default: 1

The minimum number of bytes that must be available to respond.


fetchMaxBytes - Default: 2048

The maximum bytes to include in the message set for this partition.


fromOffset - Default: false

If set true, consumer will fetch message from the given offset in the KafkaMessage.


As of now, we're working on allowing support for Buffers. The options for this will be provided on the encoding and keyEncoding keys.




Example

Now, let's see an example of what we've learned so far and how it would look in your code.

const clientOptions = {
  host: 'localhost',
  port: 9092,
}

const topics = {
  topic1: null,
  topic2: null,
}

const kafka = Kafka(clientOptions, topics);

kafka.consume('topic2', { autoCommit: true }, (data, error) => console.log(data.message));

kafka.send('topic2', 'Hello, topic2!');



RabbitMQ

Initilization

Initilizing RabbitMQ is similar to the syntax you have seen previously with our standards and Kakfa. With RabbitMQ, we use the "createRabbitClass" factory function to instantiate a variable to act as our broker.

Is that an await? Yes, yes it is. Our RabbitMQ message broker can be instantiated using promises.

const rabbit = await createRabbitClass(clientOptions, topics);

clientOptions

With Codename Hermes, you can create a customaizable rabbit broker for any occasion thanks to the below client options. Continue reading to find out more!

{
  username?: string;
  password?: string;
  protocol?: 'amqp' | 'amqps';
  vhost?: string;
  locale?: string;
  frameMax?: number;
  heartbeat?: number;
}

username - Default: guest

User name to create when RabbitMQ creates a new database from scratch.


password - Default: guest

Password for the default user.


protocol - Default: amqp

Mandatory option to specify what messaging protocol is used for the broker. The Codename Hermes team, decided to utilize the AMQP 0-9-1 messaging protocol because it is a mature and widely adopted messaging protocol that provides the necessary features and capabilities for building robust, scalable, and interoperable message broker systems.

Please refer to AMQP 0-9-1 to learn more!


vhost - Default: '/'

The name for the virtual host. RabbitMQ out of the box uses a virtual host named '/'. Changing this option beyond the default is needed when trying to create logical groupings and/or isolated messaging environments within your RabbitMQ broker.


locale - Default: en_US

The desired locale for error messages. NOTE: RabbitMQ only ever uses en_US.


frameMax - Default: 0

The size in bytes of the maximum frame allowed over the connection. 0 means no limit. Please refer to frameMax for more inforamtion.


heartbeat - Default: 0

The period of the connection heartbeat, in seconds. Please refer to heartbeating for more inforamtion.


topics

Now that you have decided on your options, it is time to determine what type of topics you will be. Let's keep reading to find out about our next topic, "topics".

{
    exchange: amqp.Options.AssertExchange & {
      name: string;
      type?: 'direct' | 'topic' | 'headers' | 'fanout' | 'match';
    };
  } & amqp.Options.AssertQueue & {
      key?: string;
    }

exchange object

name - Default: ' ' (empty string)

Name of the topic exchange. Get creative and specific here.


type - Default: topic

This option determines what type of exchange our RabbitMQ broker will use. Although a developer could use any rabbit exchange type with our library, the Codename Hermes team decided to default to using topics because topic exchanges support the publish-subscribe messaging model as well as supports the scalability and extensibility of messaging systems.


amqp AssertExchange options


durable - Default: true

if true, the exchange will survive broker restarts.


internal - Default: false

if true, messages cannot be published directly to the exchange (i.e., it can only be the target of bindings, or possibly create messages ex-nihilo).


autoDelete - Default: false

if true, the exchange will be destroyed once the number of bindings for which it is the source drop to zero.


alternateExchange - Default: ' ' (empty string)

an exchange to send messages to if this exchange can’t route them to any queues. Think backup exchange.


arguments - Default: {} (empty object)

any additional arguments that may be needed by an exchange type.


Please refer to AssertExchange options for more inforamtion.


Additional Properties on topic object


key - Default: ' ' (empty string)

This value represents the binding and routing keys to ensure produced messages are consumed by the appropriate rabbit broker.


amqp AssertQueue options


exclusive - Default: false

if true, scopes the queue to the connection.


durable - Default: true

if true, the queue will survive broker restarts, modulo the effects of exclusive and autoDelete.


autoDelete - Default: false

if true, the queue will be deleted when the number of consumers drops to zero.


arguments - Default: {} (empty object)

any additional arguments that may be needed by an exchange type.


Please refer to AssertQueue options for more inforamtion and potential arguments.


An example of implementing a topic might look like:
const topics = {
topic1: {
  exchange: {
    name: "topics",
    durable: false,
    type: "topic",
  },
  durable: false,
  key: "hermes",
},
};

Produce

In the wonderful world of Rabbit, we still produce messages with our broker using our standard send method.


NOTE: the last argument is optional and is not required to send messages with your rabbit broker.

send(topic: string, message: string, options?: amqp.Options.Publish)

If you would like to enhance your messages, you are free to do so with the following publish options.

amqp Publish options


expiration - Default: ' ' (empty string)

If supplied, the message will be discarded from a queue once it’s been there longer than the given number of milliseconds. In the specification this is a string; numbers supplied here will be coerced to strings for transit.


userId - Default: ' ' (empty string)

If supplied, RabbitMQ will compare it to the username supplied when opening the connection, and reject messages for which it does not match.


CC - Default: ' ' (empty string)

A routing key as a string or an array of routing keys as strings; messages will be routed to these routing keys in addition to that given as the routingKey parameter. A string will be implicitly treated as an array containing just that string. This will override any value given for CC in the headers parameter. NB The property names CC and BCC are case-sensitive.


priority - Default: 0

A priority for the message; ignored by versions of RabbitMQ older than 3.5.0, or if the queue is not a priority queue (see maxPriority above).


persistent - Default: false

If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts. Corresponds to, and overrides, the property deliveryMode.


Please refer to Publish options for more inforamtion and potential arguments.


Quick example

rabbit.send("topic1", "hello from sender.ts in ch lib test");

Consume
Comming soon!




Example

import ch, { RabbitTopic, createRabbitClass } from "library";

const clientOptions = { host: "localhost", port: 5672 };

const topics: RabbitTopic = {
  topic1: {
    exchange: {
      name: "topics",
      durable: false,
      type: "topic",
    },
    durable: false,
    key: "hermes",
  },
};

const rabbit = await createRabbitClass(clientOptions,topics);

rabbit.send("topic1", "hello from sender.ts in ch lib test");



Credits

A major special thanks to kafka-node and amqplib for allowing this project to have an accelerated launch!

How to Contribute

Codename Hermes is currently in alpha, we would love to hear your feedback, encouragement, advice, suggestions, or problems! If you would like to contribute please contact us at [email protected]

Contributers

Aaron Allen - LinkedIn GitHub

Mathew Hultquist - LinkedIn GitHub

Jose Mencos - LinkedIn GitHub

Samantha Mills - LinkedIn GitHub

About

A library that aims to abstract away and standardize the initial setup and implementation of connecting with message brokers

Resources

Code of conduct

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •