-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
logs.go
126 lines (107 loc) · 3.14 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package logs
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common"
)
type exporter interface {
export(plog.Logs) error
}
type gRPCClientExporter struct {
client plogotlp.GRPCClient
}
func (e *gRPCClientExporter) export(logs plog.Logs) error {
req := plogotlp.NewExportRequestFromLogs(logs)
if _, err := e.client.Export(context.Background(), req); err != nil {
return err
}
return nil
}
// Start starts the log telemetry generator
func Start(cfg *Config) error {
logger, err := common.CreateLogger()
if err != nil {
return err
}
if cfg.UseHTTP {
return fmt.Errorf("http is not supported by 'telemetrygen logs'")
}
if !cfg.Insecure {
return fmt.Errorf("'telemetrygen logs' only supports insecure gRPC")
}
// only support grpc in insecure mode
clientConn, err := grpc.DialContext(context.TODO(), cfg.Endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
exporter := &gRPCClientExporter{
client: plogotlp.NewGRPCClient(clientConn),
}
if err = Run(cfg, exporter, logger); err != nil {
logger.Error("failed to stop the exporter", zap.Error(err))
return err
}
return nil
}
// Run executes the test scenario.
func Run(c *Config, exp exporter, logger *zap.Logger) error {
if c.TotalDuration > 0 {
c.NumLogs = 0
} else if c.NumLogs <= 0 {
return fmt.Errorf("either `logs` or `duration` must be greater than 0")
}
limit := rate.Limit(c.Rate)
if c.Rate == 0 {
limit = rate.Inf
logger.Info("generation of logs isn't being throttled")
} else {
logger.Info("generation of logs is limited", zap.Float64("per-second", float64(limit)))
}
wg := sync.WaitGroup{}
res := resource.NewWithAttributes(semconv.SchemaURL, c.GetAttributes()...)
running := &atomic.Bool{}
running.Store(true)
for i := 0; i < c.WorkerCount; i++ {
wg.Add(1)
w := worker{
numLogs: c.NumLogs,
limitPerSecond: limit,
totalDuration: c.TotalDuration,
running: running,
wg: &wg,
logger: logger.With(zap.Int("worker", i)),
index: i,
}
go w.simulateLogs(res, exp)
}
if c.TotalDuration > 0 {
time.Sleep(c.TotalDuration)
running.Store(false)
}
wg.Wait()
return nil
}