diff --git a/README.md b/README.md index 47866ef..a0ef99c 100644 --- a/README.md +++ b/README.md @@ -82,9 +82,14 @@ go build \ ### Free books +* [https://www.openmymind.net/The-Little-Go-Book/](https://www.openmymind.net/The-Little-Go-Book/) * https://golang.org/doc/effective_go.html#generality * [https://golang.org/doc/code.html](https://golang.org/doc/code.html) +### Blogs + +* [https://www.goin5minutes.com/screencasts/](https://www.goin5minutes.com/screencasts/) + ### Databases * [https://golang.org/pkg/database/sql/](https://golang.org/pkg/database/sql/) @@ -144,4 +149,13 @@ go build \ * [https://ewencp.org/blog/golang-iterators/](https://ewencp.org/blog/golang-iterators/) * [https://pkg.go.dev/google.golang.org/api/iterator?tab=doc#example-package-ServerPages](https://pkg.go.dev/google.golang.org/api/iterator?tab=doc#example-package-ServerPages) -* [https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines](https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines) \ No newline at end of file +* [https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines](https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines) + +### Concurrency + +* [https://www.goin5minutes.com/blog/channel_over_channel/](https://www.goin5minutes.com/blog/channel_over_channel/) + +### Workers Pool + +* [https://brandur.org/go-worker-pool](https://brandur.org/go-worker-pool) +* [https://maptiks.com/blog/using-go-routines-and-channels-with-aws-2/](https://maptiks.com/blog/using-go-routines-and-channels-with-aws-2/) \ No newline at end of file diff --git a/cmd/mq-to-db/main.go b/cmd/mq-to-db/main.go index 158c8b9..2872e68 100644 --- a/cmd/mq-to-db/main.go +++ b/cmd/mq-to-db/main.go @@ -15,9 +15,11 @@ import ( "github.com/christiangda/mq-to-db/internal/consumer" "github.com/christiangda/mq-to-db/internal/consumer/kafka" "github.com/christiangda/mq-to-db/internal/consumer/rmq" + "github.com/christiangda/mq-to-db/internal/messages" "github.com/christiangda/mq-to-db/internal/storage" "github.com/christiangda/mq-to-db/internal/storage/memory" "github.com/christiangda/mq-to-db/internal/storage/pgsql" + "github.com/christiangda/mq-to-db/internal/worker" "os" "strings" @@ -245,23 +247,19 @@ func main() { log.Infof("Connecting to consumer") qc.Connect() - // workers to proccess every consumed message - for id := 1; id <= conf.Consumer.Workers; id++ { + log.Printf("Creating workers pool: %s, with: %d workers", conf.Application.Name, conf.Consumer.Workers) + cPool := worker.NewPool(appCtx, &wg, conf.Consumer.Workers, conf.Application.Name) - // creates a worker id - wid := fmt.Sprintf("%s-w-%d", conf.Application.Name, id) + // log.Print("Get consuming channel") + // job, err := qc.Consume("") + // if err != nil { + // log.Error(err) + // } - // Add control for new worker routine - wg.Add(1) + log.Print("Connecting consuming function to workers poll") + cPool.ConsumeFrom(qc.Consume) - // Create a worker - w := consumer.NewWorker(appCtx, &wg, wid, db) - - // Start a go routine - go w.Start(qc.Consume(wid)) - } - - // Here the function main is blocked + // Here the main is blocked until doesn't receive a OS Signals // This is blocking the func main() routine until chan osSignal receive a value inside <-osSignal log.Warn("Stoping workers...") @@ -307,3 +305,43 @@ func ListenOSSignals(osSignal *chan bool) { *osSignal <- true }(osSignal) } + +func proccessMessages(ctx context.Context, m consumer.Messages, st storage.Store) { + + log.Infof("Processing message: %s", m.Payload) + + // try to convert the message payload to a SQL message type + sqlm, err := messages.NewSQL(m.Payload) + if err != nil { + log.Errorf("Error creating SQL Message: %s", err) + + if err := m.Reject(false); err != nil { + log.Errorf("Error rejecting rabbitmq message: %v", err) + } + } else { + + res, err := st.ExecContext(ctx, sqlm.Content.Sentence) + if err != nil { + log.Errorf("Error storing SQL payload: %v", err) + + if err := m.Reject(false); err != nil { + log.Errorf("Error rejecting rabbitmq message: %v", err) + } + } else { + + if err := m.Ack(); err != nil { + log.Errorf("Error executing ack on rabbitmq message: %v", err) + } + + log.Debugf("SQL message: %s", sqlm.ToJSON()) + + r, err := res.RowsAffected() + if err != nil { + log.Errorf("Error getting SQL result id: %v", err) + } + log.Debugf("DB Execution Result: %v", r) + } + + } + m.Ack() +} diff --git a/config-sample.yaml b/config-sample.yaml index 3e66dd2..29d6a02 100644 --- a/config-sample.yaml +++ b/config-sample.yaml @@ -1,6 +1,6 @@ --- consumer: - workers: 4 + workers: 1 kind: rabbitmq address: 127.0.0.1 port: 5672 diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index 24f2f71..abb2f3d 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -22,12 +22,6 @@ type Consumer interface { Close() error } -// Iterator define functionality to iterate over messages -type Iterator interface { - Next() (*Messages, error) - Close() error -} - // Priority represents a priority level for message queue type Priority uint8 diff --git a/internal/consumer/kafka/kafka.go b/internal/consumer/kafka/kafka.go index 0043f25..760f497 100644 --- a/internal/consumer/kafka/kafka.go +++ b/internal/consumer/kafka/kafka.go @@ -23,25 +23,6 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) { func (c *Consumer) Close() error { return nil } -// Iterator iterates over consumer messages -// Implements Consumer.Iterator -type Iterator struct { - id string - ch *string - messages <-chan string -} - -// Next returns the next message in the iterator. -func (i *Iterator) Next() (*consumer.Messages, error) { - m := &consumer.Messages{} - return m, nil -} - -// Close closes the channel of the Iterator. -func (i *Iterator) Close() error { - return nil -} - // Acknowledger implements the Acknowledger for AMQP. type Acknowledger struct { } diff --git a/internal/consumer/rmq/rmq.go b/internal/consumer/rmq/rmq.go index b3c8f9b..22173cc 100644 --- a/internal/consumer/rmq/rmq.go +++ b/internal/consumer/rmq/rmq.go @@ -1,7 +1,6 @@ package rmq import ( - "errors" "fmt" "time" @@ -19,8 +18,9 @@ type Consumer struct { conn *amqp.Connection channel *amqp.Channel - appName string - uri string + name string + uri string + requestedHeartbeat time.Duration virtualHost string queue struct { @@ -47,7 +47,7 @@ func New(c *config.Config) (consumer.Consumer, error) { uri := fmt.Sprintf("amqp://%s:%s@%s:%d/", c.Consumer.Username, c.Consumer.Password, c.Consumer.Address, c.Consumer.Port) return &Consumer{ - appName: c.Application.Name, + name: c.Application.Name, uri: uri, requestedHeartbeat: c.Consumer.RequestedHeartbeat, virtualHost: c.Consumer.VirtualHost, @@ -94,6 +94,7 @@ func (c *Consumer) Connect() { amqpConfig.Vhost = c.virtualHost } + log.Debugf("Connecting to: %s", c.uri) conn, err := amqp.DialConfig( c.uri, amqpConfig, @@ -104,6 +105,7 @@ func (c *Consumer) Connect() { //defer conn.Close() c.conn = conn + log.Debug("Getting Channel") ch, err := c.conn.Channel() if err != nil { log.Fatal(err) @@ -111,6 +113,7 @@ func (c *Consumer) Connect() { //defer ch.Close() c.channel = ch + log.Debugf("Declaring channel exchange: %s", c.exchange.name) err = c.channel.ExchangeDeclare( c.exchange.name, c.exchange.kind, @@ -124,6 +127,7 @@ func (c *Consumer) Connect() { log.Fatal(err) } + log.Debugf("Declaring channel queue: %s", c.queue.name) q, err := c.channel.QueueDeclare( c.queue.name, c.queue.durable, @@ -136,6 +140,7 @@ func (c *Consumer) Connect() { log.Fatal(err) } + log.Debugf("Binding queue: %s to exchange: %s using routing key: %s", q.Name, c.exchange.name, c.queue.routingKey) err = c.channel.QueueBind( q.Name, c.queue.routingKey, @@ -155,7 +160,7 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) { id = c.newConsumerID() } - // this is a blocking operation because you are consuming a channel + // Register a consumer msgs, err := c.channel.Consume( c.queue.name, id, // consumer id @@ -171,6 +176,7 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) { out := make(chan consumer.Messages) + // NOTE: This is necessary to consume the original channel without blocking it go func() { for d := range msgs { out <- consumer.Messages{ @@ -186,13 +192,12 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) { }() return out, nil - //return &Iterator{messages: msgs, ch: c.channel, id: id}, nil } // newConsumerID generate a unique consumer id compose -// by '--' +// by '-w-' func (c *Consumer) newConsumerID() string { - return fmt.Sprintf("%s-w-%s", c.appName, uuid.New().String()) + return fmt.Sprintf("%s-w-%s", c.name, uuid.New().String()) } // Close the channel connection @@ -206,42 +211,6 @@ func (c *Consumer) Close() error { return nil } -// Iterator iterates over consume messages -// Implements Consumer.Iterator -type Iterator struct { - id string - ch *amqp.Channel - messages <-chan amqp.Delivery -} - -// Next returns the next message in the iterator. -func (i *Iterator) Next() (*consumer.Messages, error) { - - d, ok := <-i.messages - if !ok { - return nil, errors.New("Channel is closed") - } - - m := &consumer.Messages{} - m.MessageID = d.MessageId - m.Priority = consumer.Priority(d.Priority) - m.Timestamp = d.Timestamp - m.ContentType = d.ContentType - m.Acknowledger = &Acknowledger{d.Acknowledger, d.DeliveryTag} - m.Payload = d.Body - - return m, nil -} - -// Close closes the channel of the Iterator. -func (i *Iterator) Close() error { - if err := i.ch.Cancel(i.id, false); err != nil { - return err - } - - return i.ch.Close() -} - // Acknowledger implements the Acknowledger for AMQP library. type Acknowledger struct { ack amqp.Acknowledger diff --git a/internal/worker/worker.go b/internal/worker/worker.go new file mode 100644 index 0000000..fb326b5 --- /dev/null +++ b/internal/worker/worker.go @@ -0,0 +1,144 @@ +package worker + +import ( + "context" + "fmt" + "sync" + + "github.com/christiangda/mq-to-db/internal/consumer" + log "github.com/sirupsen/logrus" +) + +// Pool is a worker group that runs a number of tasks at a +// configured concurrency. +type Pool struct { + ctx context.Context // app context + wg *sync.WaitGroup // workers coordinator + + name string + maxWorkers int + Workers []Worker + + // This is used to comunicate with workers and send the job, once it is finished they need to send the result + // to WorkQueue. + // Good reference: https://www.goin5minutes.com/blog/channel_over_channel/ + WorkerPool chan chan consumer.Messages + WorkQueue chan consumer.Messages +} + +// NewPool initializes a new pool with the given tasks and +// at the given concurrency. +func NewPool(ctx context.Context, wg *sync.WaitGroup, maxWorkers int, name string) *Pool { + return &Pool{ + ctx: ctx, + wg: wg, + maxWorkers: maxWorkers, + + name: name, + WorkerPool: make(chan chan consumer.Messages, maxWorkers), // buffered channel + WorkQueue: make(chan consumer.Messages), + } +} + +// ConsumeFrom runs all work within the pool and blocks until it's +// finished. +func (p *Pool) ConsumeFrom(cFunc func(id string) (<-chan consumer.Messages, error)) { + + var msgs <-chan consumer.Messages + var err error + + // Start workers + for i := 0; i < p.maxWorkers; i++ { + + // creates a worker id + id := fmt.Sprintf("%s-w-%d", p.name, i) + + log.Printf("Creating worker: %s", id) + worker := NewWorker(id, p.WorkerPool) + worker.Start() + + // register in dispatcher's workers + //p.Workers = append(p.Workers, worker) + + // execute consumer function + msgs, err = cFunc(id) + if err != nil { + log.Error(err) + } + } + + // this is the dispatcher logic + go func(msgs <-chan consumer.Messages) { + for { + + select { + + case job := <-msgs: + + log.Info("Received job request") + + // a job request has been received + go func(job consumer.Messages) { + // try to obtain a worker job channel that is available. + // this will block until a worker is idle + jobChannel := <-p.WorkerPool + + // dispatch the job to the worker job channel + jobChannel <- job + }(job) + } + } + }(msgs) +} + +// Worker encapsulates a work item that should go in a work +// pool. +type Worker struct { + ID string + JobsQueue chan consumer.Messages + + workerPool chan chan consumer.Messages + quit chan bool +} + +// NewWorker return a new worker +func NewWorker(id string, pool chan chan consumer.Messages) Worker { + return Worker{ + ID: id, + JobsQueue: make(chan consumer.Messages), + + workerPool: pool, + quit: make(chan bool), + } +} + +// Start initialize a worker +func (w Worker) Start() { + + go func() { + for { + // register the current worker into the worker queue. + // this step news to be done + w.workerPool <- w.JobsQueue + + select { + case job := <-w.JobsQueue: + + go func(m consumer.Messages) { + + }(job) + + case <-w.quit: + // we have received a signal to stop + return + } + } + }() +} + +// Stop signals the worker to stop listening for work requests. +func (w Worker) Stop() { + go func() { // non-blocking call + w.quit <- true + }() +}