Skip to content

Commit

Permalink
changing the message structure
Browse files Browse the repository at this point in the history
  • Loading branch information
sotchenkov committed Jun 20, 2024
1 parent 2674c6a commit 4143fe0
Show file tree
Hide file tree
Showing 24 changed files with 58 additions and 54 deletions.
Empty file modified Dockerfile
100644 → 100755
Empty file.
Empty file modified LICENSE
100644 → 100755
Empty file.
Empty file modified README.md
100644 → 100755
Empty file.
Empty file modified assets/limero_logo.png
100644 → 100755
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file modified assets/swag.png
100644 → 100755
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file modified cmd/limero/main.go
100644 → 100755
Empty file.
Empty file modified docs/docs.go
100644 → 100755
Empty file.
Empty file modified docs/swagger.json
100644 → 100755
Empty file.
Empty file modified docs/swagger.yaml
100644 → 100755
Empty file.
Empty file modified go.mod
100644 → 100755
Empty file.
Empty file modified go.sum
100644 → 100755
Empty file.
Empty file modified imgs/limero_logo.png
100644 → 100755
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file modified imgs/swag.png
100644 → 100755
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file modified internal/lib/logger/middleware.go
100644 → 100755
Empty file.
Empty file modified internal/lib/response/response.go
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion internal/queue/queue.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type Message struct {
Value string `json:"value"`
Value map[string]interface{} `json:"value"`
}

// NewQueue returns a new queue with the given initial presize.
Expand Down
16 changes: 10 additions & 6 deletions internal/queue/queue_test.go
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queue

import (
"reflect"
"testing"
)

Expand All @@ -13,16 +14,19 @@ func TestNewQueue(t *testing.T) {

func TestPushAndPop(t *testing.T) {
q := NewQueue(2, "testQueue")
msg1 := &Message{Value: "first"}
msg2 := &Message{Value: "second"}
msg1 := &Message{Value: map[string]interface{}{"message": "first"}}
msg2 := &Message{Value: map[string]interface{}{"message": "second"}}

q.Push(msg1)
q.Push(msg2)

if q.Pop().Value != "first" {
poppedMsg1 := q.Pop()
if !reflect.DeepEqual(poppedMsg1.Value, map[string]interface{}{"message": "first"}) {
t.Errorf("Pop did not return the first pushed message")
}
if q.Pop().Value != "second" {

poppedMsg2 := q.Pop()
if !reflect.DeepEqual(poppedMsg2.Value, map[string]interface{}{"message": "second"}) {
t.Errorf("Pop did not return the second pushed message")
}
}
Expand All @@ -33,15 +37,15 @@ func TestIsEmpty(t *testing.T) {
t.Errorf("IsEmpty should return true for a new queue")
}

q.Push(&Message{Value: "test"})
q.Push(&Message{Value: map[string]interface{}{"message": "test"}})
if q.IsEmpty() {
t.Errorf("IsEmpty should return false for a queue with messages")
}
}

func TestInfo(t *testing.T) {
q := NewQueue(10, "testQueue")
q.Push(&Message{Value: "test"})
q.Push(&Message{Value: map[string]interface{}{"message": "test"}})

info := q.Info()
if info.Name != "testQueue" || info.Presize != 10 || info.Size != 10 || info.Head != 0 || info.Tail != 1 || info.Count != 1 {
Expand Down
Empty file modified internal/server/handlers/msg.go
100644 → 100755
Empty file.
41 changes: 17 additions & 24 deletions internal/server/handlers/msg_test.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestPushMsgWithoutQueueName(t *testing.T) {
body := strings.NewReader(`{"value":"test message"}`)
body := strings.NewReader(`{"value":{"message":"test message"}}`)
req, err := http.NewRequest("POST", "/msg", body)
if err != nil {
t.Fatal(err)
Expand All @@ -24,7 +24,7 @@ func TestPushMsgWithoutQueueName(t *testing.T) {
}

func TestPushMsgToNonExistentQueue(t *testing.T) {
body := strings.NewReader(`{"value":"test message"}`)
body := strings.NewReader(`{"value":{"message":"test message"}}`)
req, err := http.NewRequest("POST", "/msg?qname=nonexistent", body)
if err != nil {
t.Fatal(err)
Expand All @@ -39,11 +39,9 @@ func TestPushMsgToNonExistentQueue(t *testing.T) {

func TestPushMsgWithUnsupportedType(t *testing.T) {
req, err := http.NewRequest("PUT", "/queue?name=testQueue&size=10", nil)

if err != nil {
t.Fatal(err)
}

rr := httptest.NewRecorder()
createQueue(rr, req)

Expand All @@ -52,7 +50,6 @@ func TestPushMsgWithUnsupportedType(t *testing.T) {
if err != nil {
t.Fatal(err)
}

req.Header.Set("Content-Type", "text/plain")
rr = httptest.NewRecorder()
PushMsg(rr, req)
Expand All @@ -64,11 +61,9 @@ func TestPushMsgWithUnsupportedType(t *testing.T) {

func TestPushMsgWithEmptyBody(t *testing.T) {
req, err := http.NewRequest("PUT", "/queue?name=testQueue&size=10", nil)

if err != nil {
t.Fatal(err)
}

rr := httptest.NewRecorder()
createQueue(rr, req)

Expand All @@ -87,15 +82,13 @@ func TestPushMsgWithEmptyBody(t *testing.T) {

func TestSuccessfulMsgPush(t *testing.T) {
req, err := http.NewRequest("PUT", "/queue?name=testQueue&size=10", nil)

if err != nil {
t.Fatal(err)
}

rr := httptest.NewRecorder()
createQueue(rr, req)

body := strings.NewReader(`{"value": "test"}`)
body := strings.NewReader(`{"value": {"message": "test"}}`)
req, err = http.NewRequest("POST", "/msg?qname=testQueue", body)
if err != nil {
t.Fatal(err)
Expand All @@ -116,7 +109,7 @@ func BenchmarkPushMsgQueueSize1(b *testing.B) {

// Run the benchmark
for i := 0; i < b.N; i++ {
body := strings.NewReader(`{"value":"message"}`)
body := strings.NewReader(`{"value":{"message":"message"}}`)
req, _ := http.NewRequest("POST", "/msg?qname="+queueName, body)
rr := httptest.NewRecorder()
PushMsg(rr, req)
Expand All @@ -131,22 +124,22 @@ func BenchmarkPushMsgQueueSize100(b *testing.B) {

// Run the benchmark
for i := 0; i < b.N; i++ {
body := strings.NewReader(`{"value":"message"}`)
body := strings.NewReader(`{"value":{"message":"message"}}`)
req, _ := http.NewRequest("POST", "/msg?qname="+queueName, body)
rr := httptest.NewRecorder()
PushMsg(rr, req)
}
}

func BenchmarkPushMsgQueueSize1000(b *testing.B) {
func BenchmarkPushMsgQueueSize100000(b *testing.B) {
// Setup
queueName := "testQueueSize1000"
queues[queueName] = queue.NewQueue(1000, queueName)
queueName := "testQueueSize100000"
queues[queueName] = queue.NewQueue(100000, queueName)
defer delete(queues, queueName)

// Run the benchmark
for i := 0; i < b.N; i++ {
body := strings.NewReader(`{"value":"message"}`)
body := strings.NewReader(`{"value":{"message":"message"}}`)
req, _ := http.NewRequest("POST", "/msg?qname="+queueName, body)
rr := httptest.NewRecorder()
PushMsg(rr, req)
Expand Down Expand Up @@ -208,7 +201,7 @@ func TestSuccessfulMsgPop(t *testing.T) {
rr := httptest.NewRecorder()
createQueue(rr, req)

body := strings.NewReader(`{"value": "test"}`)
body := strings.NewReader(`{"value": {"message": "test"}}`)
req, err = http.NewRequest("POST", "/msg?qname=testQueue", body)
if err != nil {
t.Fatal(err)
Expand All @@ -233,7 +226,7 @@ func BenchmarkPopMsgQueueSize1(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
queueName := "testQueueSize1"
localQueue := queue.NewQueue(1, queueName)
localQueue.Push(&queue.Message{Value: "message"})
localQueue.Push(&queue.Message{Value: map[string]interface{}{"message": "message"}})

for pb.Next() {
req, err := http.NewRequest("GET", "/msg?qname="+queueName, nil)
Expand All @@ -249,9 +242,9 @@ func BenchmarkPopMsgQueueSize1(b *testing.B) {

func BenchmarkPopMsgQueueSize100(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
queueName := "testQueueSize1"
queueName := "testQueueSize100"
localQueue := queue.NewQueue(100, queueName)
localQueue.Push(&queue.Message{Value: "message"})
localQueue.Push(&queue.Message{Value: map[string]interface{}{"message": "message"}})

for pb.Next() {
req, err := http.NewRequest("GET", "/msg?qname="+queueName, nil)
Expand All @@ -265,11 +258,11 @@ func BenchmarkPopMsgQueueSize100(b *testing.B) {
})
}

func BenchmarkPopMsgQueueSize10000(b *testing.B) {
func BenchmarkPopMsgQueueSize100000(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
queueName := "testQueueSize10000"
localQueue := queue.NewQueue(1000, queueName)
localQueue.Push(&queue.Message{Value: "message"})
queueName := "testQueueSize100000"
localQueue := queue.NewQueue(100000, queueName)
localQueue.Push(&queue.Message{Value: map[string]interface{}{"message": "message"}})

for pb.Next() {
req, err := http.NewRequest("GET", "/msg?qname="+queueName, nil)
Expand Down
Empty file modified internal/server/handlers/queue.go
100644 → 100755
Empty file.
Empty file modified internal/server/handlers/queue_test.go
100644 → 100755
Empty file.
Empty file modified internal/server/handlers/root.go
100644 → 100755
Empty file.
9 changes: 9 additions & 0 deletions internal/server/server.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"log"
"net/http"

"net/http/pprof"

_ "github.com/sotchenkov/limero/docs"
"github.com/sotchenkov/limero/internal/lib/logger"
"github.com/sotchenkov/limero/internal/server/handlers"
Expand All @@ -18,6 +20,13 @@ func Serv(zlog *zap.Logger) {
httpSwagger.URL("https://localhost:7920/swagger/doc.json"),
))

// Регистрация pprof-обработчиков
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

mux.HandleFunc("/", handlers.Root)
mux.HandleFunc("/ping", handlers.Ping)
mux.HandleFunc("/queue", handlers.ActionOnQueueHandlers)
Expand Down
44 changes: 21 additions & 23 deletions tests/hightload.go → tests/stress.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@ import (
)

type Msg struct {
Value string `json:"value"`
Value map[string]interface{} `json:"value"`
}

func createQueue(brokerAddr string, qname string, presize string) error {
req, err := http.NewRequest(http.MethodPut, "https://"+brokerAddr+"/queue?name="+qname+"&size="+presize, nil)
if err != nil {
log.Fatal("queue creation error")
log.Print("queue creation error")
return err
}

req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal("request error")
log.Print("request error")
return err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
log.Fatal("status code is not 201")
log.Printf("status code is not 201: %v", resp.StatusCode)
return errors.New("wrong status code")
}

Expand All @@ -47,11 +47,12 @@ func createQueue(brokerAddr string, qname string, presize string) error {
}

func sendMsg(value string, brokerAddr string, qname string) error {
body := []byte(fmt.Sprintf(`{"value": "%s"}`, value))
body := []byte(fmt.Sprintf(`{"value": {"message": "%s"}}`, value))
log.Printf("Sending message payload: %s", body)

req, err := http.NewRequest(http.MethodPost, "https://"+brokerAddr+"/msg?qname="+qname, bytes.NewBuffer(body))
if err != nil {
log.Fatal("couldn't send a message")
log.Print("couldn't send a message")
return err
}

Expand All @@ -61,39 +62,36 @@ func sendMsg(value string, brokerAddr string, qname string) error {

resp, err := httpClient.Do(req)
if err != nil {
log.Fatal("couldn't send a message")
log.Print("couldn't send a message")
return err
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
log.Fatal("status code is not 201")
log.Printf("status code is not 201: %v", resp.StatusCode)
return errors.New("wrong status code")
}

log.Print("the message has been sent successfully")

return nil
}

func getMsg(brokerAddr string, qname string) (error, *Msg) {
httpClient := http.Client{Timeout: 5 * time.Second}
r, err := httpClient.Get("https://" + brokerAddr + "/msg?qname=" + qname)
if err != nil {
log.Fatal("request to receive a message from the broker could not be executed")

return err, &Msg{Value: ""}
log.Print("request to receive a message from the broker could not be executed")
return err, &Msg{Value: map[string]interface{}{}}
}

defer r.Body.Close()

var msg *Msg
err = json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
log.Fatal("message could not be decoded")

return err, &Msg{Value: ""}
log.Print("message could not be decoded")
return err, &Msg{Value: map[string]interface{}{}}
}

return nil, msg
Expand All @@ -102,20 +100,20 @@ func getMsg(brokerAddr string, qname string) (error, *Msg) {
func deleteQueue(brokerAddr string, qname string) {
req, err := http.NewRequest(http.MethodDelete, "https://"+brokerAddr+"/queue?name="+qname, nil)
if err != nil {
log.Fatal("queue deletion error")
log.Print("queue deletion error")
}

req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal("request error")
log.Print("request error")
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Fatal("status code is not 200")
log.Printf("status code is not 200: %v", resp.StatusCode)
}

log.Print("queue has been deleted successfully")
Expand Down Expand Up @@ -145,8 +143,8 @@ func BrokerStress() {
presize = "1000"
sendGorutines = 10000
reciveGorutines = 10000
sendTimeout = 1 // milleseconds
getTimeout = 1 // milleseconds
sendTimeout = 1 // milliseconds
getTimeout = 1 // milliseconds
msgLen = 5 // characters
)

Expand Down Expand Up @@ -189,7 +187,7 @@ func BrokerStress() {
if err != nil {
log.Fatal(err)
}
log.Printf("recived msg: %s", msg)
log.Printf("received msg: %s", msg.Value["message"])
}
}
}()
Expand All @@ -207,9 +205,9 @@ func BrokerStress() {

log.Print("deleting a queue")
deleteQueue(brokerAddr, queueName)
log.Print("queues has been deleted")
log.Print("queue has been deleted")

log.Print("stress stoped")
log.Print("stress stopped")
}

func main() {
Expand Down

0 comments on commit 4143fe0

Please sign in to comment.