forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
file_reader.go
132 lines (122 loc) · 3.25 KB
/
file_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package filereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filereceiver"
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)
// stringReader is the only function we use from *bufio.Reader. We define it
// so that it can be swapped out for testing.
type stringReader interface {
ReadString(delim byte) (string, error)
}
// fileReader
type fileReader struct {
stringReader stringReader
unm pmetric.Unmarshaler
consumer consumer.Metrics
timer *replayTimer
}
func newFileReader(consumer consumer.Metrics, file *os.File, timer *replayTimer) fileReader {
return fileReader{
consumer: consumer,
stringReader: bufio.NewReader(file),
unm: &pmetric.JSONUnmarshaler{},
timer: timer,
}
}
// readAll calls readline for each line in the file until all lines have been
// read or the context is cancelled.
func (fr fileReader) readAll(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
err := fr.readLine(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
}
}
}
// readLine reads the next line in the file, converting it into metrics and
// passing it to the the consumer member.
func (fr fileReader) readLine(ctx context.Context) error {
line, err := fr.stringReader.ReadString('\n')
if err != nil {
return fmt.Errorf("failed to read line from input file: %w", err)
}
metrics, err := fr.unm.UnmarshalMetrics([]byte(line))
if err != nil {
return fmt.Errorf("failed to unmarshal metrics: %w", err)
}
err = fr.timer.wait(ctx, getFirstTimestamp(metrics))
if err != nil {
return fmt.Errorf("readLine interrupted while waiting for timer: %w", err)
}
return fr.consumer.ConsumeMetrics(ctx, metrics)
}
func getFirstTimestamp(metrics pmetric.Metrics) pcommon.Timestamp {
resourceMetrics := metrics.ResourceMetrics()
if resourceMetrics.Len() == 0 {
return 0
}
scopeMetrics := resourceMetrics.At(0).ScopeMetrics()
if scopeMetrics.Len() == 0 {
return 0
}
metricSlice := scopeMetrics.At(0).Metrics()
if metricSlice.Len() == 0 {
return 0
}
return getFirstTimestampFromMetric(metricSlice.At(0))
}
func getFirstTimestampFromMetric(metric pmetric.Metric) pcommon.Timestamp {
//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
if dps.Len() == 0 {
return 0
}
return dps.At(0).Timestamp()
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
if dps.Len() == 0 {
return 0
}
return dps.At(0).Timestamp()
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
if dps.Len() == 0 {
return 0
}
return dps.At(0).Timestamp()
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
if dps.Len() == 0 {
return 0
}
return dps.At(0).Timestamp()
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
if dps.Len() == 0 {
return 0
}
return dps.At(0).Timestamp()
case pmetric.MetricTypeEmpty:
return 0
}
return 0
}