Skip to content

A Library that implements event bus to facilitate CQRS and Pub-Sub models.

License

Notifications You must be signed in to change notification settings

architeacher/eventually

Repository files navigation

Eventually CircleCI

license release Travis CI Coverage Status codecov GolangCI Go Report Card Codacy Badge GoDoc DepShield Badge FOSSA Status Join the chat at https://gitter.im/ahmedkamals/eventually

 _____                 _               _ _
| ____|_   _____ _ __ | |_ _   _  __ _| | |_   _
|  _| \ \ / / _ \ '_ \| __| | | |/ _` | | | | | |
| |___ \ V /  __/ | | | |_| |_| | (_| | | | |_| |
|_____| \_/ \___|_| |_|\__|\__,_|\__,_|_|_|\__, |
                                           |___/

is a library that helps to implement CQRS by having an event bus that facilitates the Pub-Sub operations.

Table of Contents

✨ Features

  • Pub/Sub model.
  • Delayed publish.
  • Scheduled publish.

🏎️ Getting Started

Prerequisites

Installation

go get -u github.com/ahmedkamals/eventually

Examples

Main

// main.go
package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"os"
	"time"

	"github.com/ahmedkamals/colorize"
	"github.com/ahmedkamals/eventually"
)

type (
	eventLogger struct {
		logChan chan string
	}

	errorQueue struct {
		errChan chan error
	}

	universalPayload struct{}
)

const (
	// MailboxDefaultSize defines size for the consumer mail box.
	MailboxDefaultSize = 10
	universal          = "universal"
)

var (
	colorized = colorize.NewColorable(os.Stdout)
)

func newEventLogger(logChan chan string) eventually.Logger {
	return &eventLogger{
		logChan: logChan,
	}
}

func (e *eventLogger) Log(message string) {
	select {
	case e.logChan <- message:
	// Drop any log message that exceeds the log queue size.
	default:
	}
}

func newErrorQueue(errChan chan error) eventually.ErrorQueue {
	return &errorQueue{
		errChan: errChan,
	}
}

func (e *errorQueue) Report(err error) {
	select {
	case e.errChan <- err:
	// Drop any error message that exceeds the error queue size.
	default:
	}
}

func main() {
	logChan := make(chan string, 10)
	errChan := make(chan error, 10)

	go monitorLogMessages(logChan)
	go monitorErrors(errChan)

	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	eventBus := eventually.NewBus(eventually.NewEventStore(), newEventLogger(logChan), newErrorQueue(errChan), 100)
	eventBus.Run(ctx)

	topics := map[string]eventually.Descriptor{
		universal:         eventually.NewDescriptor(universal, new(universalPayload), 1),
		"example":         eventually.NewDescriptor("example", "examplePayload", 1),
		"anotherExample":  eventually.NewDescriptor("anotherExample", "anotherExamplePayload", 2),
		"publishAfter":    eventually.NewDescriptor("publishAfter", "publishAfterPayload", 1),
		"schedulePublish": eventually.NewDescriptor("schedulePublish", "schedulePublishPayload", 1),
	}

	topicsConsumersMap := map[eventually.Descriptor][]eventually.Consumer{
		topics[universal]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["example"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["anotherExample"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["publishAfter"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
		topics["schedulePublish"]: {
			newExampleConsumer(eventually.NewUUID(), eventually.NewInbox(MailboxDefaultSize)),
		},
	}

	subscribe(eventBus, topicsConsumersMap, errChan)

	eventBus.Publish(topics["example"], topics["anotherExample"])
	eventBus.PublishAfter(ctx, time.Second, topics["publishAfter"])
	eventBus.SchedulePublish(ctx, time.NewTicker(800*time.Millisecond), topics["schedulePublish"])

	<-time.After(88 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.White("After publishing"), eventBus.Length(), eventBus.Topics())

	eventBus.Unsubscribe(topicsConsumersMap[topics["example"]][0], topics["example"])
	<-time.After(88 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.Magenta("After unsubscribing"), eventBus.Length(), eventBus.Topics())

	eventBus.Unregister(topics["anotherExample"])
	<-time.After(888 * time.Millisecond)
	fmt.Printf("%s, Subscribed events[%d]: %#v\n", colorized.Yellow("After unregistering"), eventBus.Length(), eventBus.Topics())

	// Delay till all topics are published.
	<-time.After(3 * time.Second)
}

func subscribe(eventBus eventually.Bus, topicsConsumersMap map[eventually.Descriptor][]eventually.Consumer, errChan chan error) {

	for topic, consumers := range topicsConsumersMap {
		for _, consumer := range consumers {
			go func(consumer eventually.Consumer) {
				for {
					message := consumer.ReadMessage()
					if message == nil {
						continue
					}
					fmt.Printf(
						"%s[%s] Inbox: got message %s\n",
						colorized.Green(consumer.String()),
						consumer.AggregateID(),
						colorized.Orange(message.Payload().(string)),
					)
				}
			}(consumer)

			switch topic.Name() {
			case universal:
				errChan <- eventBus.SubscribeToAll(consumer)

			default:
				errChan <- eventBus.Subscribe(consumer, topic)
			}
		}
	}
	// Delay till subscription happens.
	<-time.After(8 * time.Millisecond)
}

func monitorLogMessages(logChan chan string) {
	for message := range logChan {
		fmt.Println(colorized.Cyan(message))
	}
}

func monitorErrors(errChan chan error) {
	for err := range errChan {
		if err != nil && !errors.Is(err, io.EOF) {
			fmt.Println(colorized.Red(err.Error()))
		}
	}
}

Consumer

// consumer.go
package main

import (
	"fmt"
	"github.com/ahmedkamals/eventually"
	"reflect"
)

type (
	exampleConsumer struct {
		id      eventually.UUID
		mailBox eventually.Inbox
	}
)

func newExampleConsumer(ID eventually.UUID, mailBox eventually.Inbox) eventually.Consumer {
	return &exampleConsumer{
		id:      ID,
		mailBox: mailBox,
	}
}

func (ec *exampleConsumer) AggregateID() eventually.UUID {
	return ec.id
}

func (ec *exampleConsumer) MatchCriteria() eventually.Match {
	return func(topic eventually.Descriptor) bool {
		return true
	}
}

func (ec *exampleConsumer) Drop(descriptor eventually.Descriptor) {
	ec.mailBox.Receive(descriptor)
}

func (ec *exampleConsumer) ReadMessage() eventually.Descriptor {
	return ec.mailBox.Read()
}

func (ec *exampleConsumer) OnSubscribe(topic eventually.Descriptor, callback eventually.Notification) {
	callback(topic, ec)
}

func (ec *exampleConsumer) OnUnsubscribe(topic eventually.Descriptor, callback eventually.Notification) {
	callback(topic, ec)
}

func (ec *exampleConsumer) Signout() {
	ec.mailBox.Signout()
}

func (ec *exampleConsumer) GoString() string {
	return fmt.Sprintf("%s[%s]", ec.String(), ec.id)
}

func (ec *exampleConsumer) String() string {
	return reflect.TypeOf(ec).Elem().Name()
}

Sample output

πŸ•ΈοΈ Tests

make test

Benchmarks

Benchmarks Flamegraph

πŸ”₯ Todo:
  • Stats collection about published messages.
  • Storing published messages on a persistence medium (memory, disk) for a defined period of time.

🀝 Contribution

Please refer to the CONTRIBUTING.md file.

Git Hooks

In order to set up tests running on each commit do the following steps:

ln -sf ../../assets/git/hooks/pre-commit.sh .git/hooks/pre-commit && \
ln -sf ../../assets/git/hooks/pre-push.sh .git/hooks/pre-push     && \
ln -sf ../../assets/git/hooks/commit-msg.sh .git/hooks/commit-msg

πŸ‘¨β€πŸ’» Credits

πŸ†“ LICENSE

Eventually is released under MIT license, please refer to the LICENSE.md file.

FOSSA Status

Happy Coding πŸ™‚

Analytics

About

A Library that implements event bus to facilitate CQRS and Pub-Sub models.

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages