Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
christiangda committed Aug 25, 2020
1 parent b90e201 commit 0584eb0
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 85 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,14 @@ go build \

### Free books

* [https://www.openmymind.net/The-Little-Go-Book/](https://www.openmymind.net/The-Little-Go-Book/)
* https://golang.org/doc/effective_go.html#generality
* [https://golang.org/doc/code.html](https://golang.org/doc/code.html)

### Blogs

* [https://www.goin5minutes.com/screencasts/](https://www.goin5minutes.com/screencasts/)

### Databases

* [https://golang.org/pkg/database/sql/](https://golang.org/pkg/database/sql/)
Expand Down Expand Up @@ -144,4 +149,13 @@ go build \

* [https://ewencp.org/blog/golang-iterators/](https://ewencp.org/blog/golang-iterators/)
* [https://pkg.go.dev/google.golang.org/api/iterator?tab=doc#example-package-ServerPages](https://pkg.go.dev/google.golang.org/api/iterator?tab=doc#example-package-ServerPages)
* [https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines](https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines)
* [https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines](https://github.com/googleapis/google-cloud-go/wiki/Iterator-Guidelines)

### Concurrency

* [https://www.goin5minutes.com/blog/channel_over_channel/](https://www.goin5minutes.com/blog/channel_over_channel/)

### Workers Pool

* [https://brandur.org/go-worker-pool](https://brandur.org/go-worker-pool)
* [https://maptiks.com/blog/using-go-routines-and-channels-with-aws-2/](https://maptiks.com/blog/using-go-routines-and-channels-with-aws-2/)
66 changes: 52 additions & 14 deletions cmd/mq-to-db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/christiangda/mq-to-db/internal/consumer"
"github.com/christiangda/mq-to-db/internal/consumer/kafka"
"github.com/christiangda/mq-to-db/internal/consumer/rmq"
"github.com/christiangda/mq-to-db/internal/messages"
"github.com/christiangda/mq-to-db/internal/storage"
"github.com/christiangda/mq-to-db/internal/storage/memory"
"github.com/christiangda/mq-to-db/internal/storage/pgsql"
"github.com/christiangda/mq-to-db/internal/worker"

"os"
"strings"
Expand Down Expand Up @@ -245,23 +247,19 @@ func main() {
log.Infof("Connecting to consumer")
qc.Connect()

// workers to proccess every consumed message
for id := 1; id <= conf.Consumer.Workers; id++ {
log.Printf("Creating workers pool: %s, with: %d workers", conf.Application.Name, conf.Consumer.Workers)
cPool := worker.NewPool(appCtx, &wg, conf.Consumer.Workers, conf.Application.Name)

// creates a worker id
wid := fmt.Sprintf("%s-w-%d", conf.Application.Name, id)
// log.Print("Get consuming channel")
// job, err := qc.Consume("")
// if err != nil {
// log.Error(err)
// }

// Add control for new worker routine
wg.Add(1)
log.Print("Connecting consuming function to workers poll")
cPool.ConsumeFrom(qc.Consume)

// Create a worker
w := consumer.NewWorker(appCtx, &wg, wid, db)

// Start a go routine
go w.Start(qc.Consume(wid))
}

// Here the function main is blocked
// Here the main is blocked until doesn't receive a OS Signals
// This is blocking the func main() routine until chan osSignal receive a value inside
<-osSignal
log.Warn("Stoping workers...")
Expand Down Expand Up @@ -307,3 +305,43 @@ func ListenOSSignals(osSignal *chan bool) {
*osSignal <- true
}(osSignal)
}

func proccessMessages(ctx context.Context, m consumer.Messages, st storage.Store) {

log.Infof("Processing message: %s", m.Payload)

// try to convert the message payload to a SQL message type
sqlm, err := messages.NewSQL(m.Payload)
if err != nil {
log.Errorf("Error creating SQL Message: %s", err)

if err := m.Reject(false); err != nil {
log.Errorf("Error rejecting rabbitmq message: %v", err)
}
} else {

res, err := st.ExecContext(ctx, sqlm.Content.Sentence)
if err != nil {
log.Errorf("Error storing SQL payload: %v", err)

if err := m.Reject(false); err != nil {
log.Errorf("Error rejecting rabbitmq message: %v", err)
}
} else {

if err := m.Ack(); err != nil {
log.Errorf("Error executing ack on rabbitmq message: %v", err)
}

log.Debugf("SQL message: %s", sqlm.ToJSON())

r, err := res.RowsAffected()
if err != nil {
log.Errorf("Error getting SQL result id: %v", err)
}
log.Debugf("DB Execution Result: %v", r)
}

}
m.Ack()
}
2 changes: 1 addition & 1 deletion config-sample.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
consumer:
workers: 4
workers: 1
kind: rabbitmq
address: 127.0.0.1
port: 5672
Expand Down
6 changes: 0 additions & 6 deletions internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ type Consumer interface {
Close() error
}

// Iterator define functionality to iterate over messages
type Iterator interface {
Next() (*Messages, error)
Close() error
}

// Priority represents a priority level for message queue
type Priority uint8

Expand Down
19 changes: 0 additions & 19 deletions internal/consumer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,6 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) {

func (c *Consumer) Close() error { return nil }

// Iterator iterates over consumer messages
// Implements Consumer.Iterator
type Iterator struct {
id string
ch *string
messages <-chan string
}

// Next returns the next message in the iterator.
func (i *Iterator) Next() (*consumer.Messages, error) {
m := &consumer.Messages{}
return m, nil
}

// Close closes the channel of the Iterator.
func (i *Iterator) Close() error {
return nil
}

// Acknowledger implements the Acknowledger for AMQP.
type Acknowledger struct {
}
Expand Down
57 changes: 13 additions & 44 deletions internal/consumer/rmq/rmq.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rmq

import (
"errors"
"fmt"
"time"

Expand All @@ -19,8 +18,9 @@ type Consumer struct {
conn *amqp.Connection
channel *amqp.Channel

appName string
uri string
name string
uri string

requestedHeartbeat time.Duration
virtualHost string
queue struct {
Expand All @@ -47,7 +47,7 @@ func New(c *config.Config) (consumer.Consumer, error) {
uri := fmt.Sprintf("amqp:https://%s:%s@%s:%d/", c.Consumer.Username, c.Consumer.Password, c.Consumer.Address, c.Consumer.Port)

return &Consumer{
appName: c.Application.Name,
name: c.Application.Name,
uri: uri,
requestedHeartbeat: c.Consumer.RequestedHeartbeat,
virtualHost: c.Consumer.VirtualHost,
Expand Down Expand Up @@ -94,6 +94,7 @@ func (c *Consumer) Connect() {
amqpConfig.Vhost = c.virtualHost
}

log.Debugf("Connecting to: %s", c.uri)
conn, err := amqp.DialConfig(
c.uri,
amqpConfig,
Expand All @@ -104,13 +105,15 @@ func (c *Consumer) Connect() {
//defer conn.Close()
c.conn = conn

log.Debug("Getting Channel")
ch, err := c.conn.Channel()
if err != nil {
log.Fatal(err)
}
//defer ch.Close()
c.channel = ch

log.Debugf("Declaring channel exchange: %s", c.exchange.name)
err = c.channel.ExchangeDeclare(
c.exchange.name,
c.exchange.kind,
Expand All @@ -124,6 +127,7 @@ func (c *Consumer) Connect() {
log.Fatal(err)
}

log.Debugf("Declaring channel queue: %s", c.queue.name)
q, err := c.channel.QueueDeclare(
c.queue.name,
c.queue.durable,
Expand All @@ -136,6 +140,7 @@ func (c *Consumer) Connect() {
log.Fatal(err)
}

log.Debugf("Binding queue: %s to exchange: %s using routing key: %s", q.Name, c.exchange.name, c.queue.routingKey)
err = c.channel.QueueBind(
q.Name,
c.queue.routingKey,
Expand All @@ -155,7 +160,7 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) {
id = c.newConsumerID()
}

// this is a blocking operation because you are consuming a channel
// Register a consumer
msgs, err := c.channel.Consume(
c.queue.name,
id, // consumer id
Expand All @@ -171,6 +176,7 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) {

out := make(chan consumer.Messages)

// NOTE: This is necessary to consume the original channel without blocking it
go func() {
for d := range msgs {
out <- consumer.Messages{
Expand All @@ -186,13 +192,12 @@ func (c *Consumer) Consume(id string) (<-chan consumer.Messages, error) {
}()

return out, nil
//return &Iterator{messages: msgs, ch: c.channel, id: id}, nil
}

// newConsumerID generate a unique consumer id compose
// by '<application name>-<queue name>-<uuid>'
// by '<application name>-w-<uuid>'
func (c *Consumer) newConsumerID() string {
return fmt.Sprintf("%s-w-%s", c.appName, uuid.New().String())
return fmt.Sprintf("%s-w-%s", c.name, uuid.New().String())
}

// Close the channel connection
Expand All @@ -206,42 +211,6 @@ func (c *Consumer) Close() error {
return nil
}

// Iterator iterates over consume messages
// Implements Consumer.Iterator
type Iterator struct {
id string
ch *amqp.Channel
messages <-chan amqp.Delivery
}

// Next returns the next message in the iterator.
func (i *Iterator) Next() (*consumer.Messages, error) {

d, ok := <-i.messages
if !ok {
return nil, errors.New("Channel is closed")
}

m := &consumer.Messages{}
m.MessageID = d.MessageId
m.Priority = consumer.Priority(d.Priority)
m.Timestamp = d.Timestamp
m.ContentType = d.ContentType
m.Acknowledger = &Acknowledger{d.Acknowledger, d.DeliveryTag}
m.Payload = d.Body

return m, nil
}

// Close closes the channel of the Iterator.
func (i *Iterator) Close() error {
if err := i.ch.Cancel(i.id, false); err != nil {
return err
}

return i.ch.Close()
}

// Acknowledger implements the Acknowledger for AMQP library.
type Acknowledger struct {
ack amqp.Acknowledger
Expand Down
Loading

0 comments on commit 0584eb0

Please sign in to comment.