Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for limit parameter #22

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add support for limit parameter
limit how many message is moved from src to dest queue
  • Loading branch information
frodeaa committed Sep 1, 2019
commit c0e17d8cf59eef9d81d86c8bf7e24d0402d2bae5
24 changes: 20 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ import (
func main() {
src := flag.String("src", "", "source queue")
dest := flag.String("dest", "", "destination queue")
limit := flag.Int("limit", -1, "limit number of messages moved")
flag.Parse()

if *src == "" || *dest == "" {
if *src == "" || *dest == "" || *limit < -1 {
flag.Usage()
os.Exit(1)
}

log.Printf("source queue : %v", *src)
log.Printf("destination queue : %v", *dest)
log.Printf("limit : %v", *limit)

// enable automatic use of AWS_PROFILE like awscli and other tools do.
opts := session.Options{
Expand All @@ -48,7 +50,8 @@ func main() {
}

lastMessageCount := int(1)
// loop as long as there are messages on the queue
movedMessageCount := 0
// loop as long as there are messages on the queue or we've reached the limit
for {
resp, err := client.ReceiveMessage(rmin)

Expand All @@ -58,18 +61,25 @@ func main() {

if lastMessageCount == 0 && len(resp.Messages) == 0 {
// no messages returned twice now, the queue is probably empty
log.Printf("done")
log.Printf("done, moved %v messages", movedMessageCount)
return
}

lastMessageCount = len(resp.Messages)
log.Printf("received %v messages...", len(resp.Messages))

var wg sync.WaitGroup
wg.Add(len(resp.Messages))

for _, m := range resp.Messages {
go func(m *sqs.Message) {

if *limit != -1 && movedMessageCount >= *limit {
return
}

wg.Add(1)
movedMessageCount += 1

defer wg.Done()

// write the message to the destination queue
Expand Down Expand Up @@ -102,5 +112,11 @@ func main() {

// wait for all jobs from this batch...
wg.Wait()


if *limit != -1 && movedMessageCount >= *limit {
log.Printf("done, moved %v messages", movedMessageCount)
break
}
}
}