Skip to content

Commit

Permalink
Remove usage of deprecated pdata package (open-telemetry#9971)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored May 11, 2022
1 parent d1a8818 commit 6f29012
Show file tree
Hide file tree
Showing 18 changed files with 51 additions and 82 deletions.
31 changes: 15 additions & 16 deletions internal/stanza/frompdataconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ import (
"time"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

// FromPdataConverter converts a set of entry.Entry into pdata.Logs
// FromPdataConverter converts a set of entry.Entry into plog.Logs
//
// The diagram below illustrates the internal communication inside the FromPdataConverter:
//
// ┌─────────────────────────────────┐
// │ Batch() │
// ┌─────────┤ Ingests pdata.Logs, splits up │
// ┌─────────┤ Ingests plog.Logs, splits up
// │ │ and places them on workerChan │
// │ └─────────────────────────────────┘
// │
Expand Down Expand Up @@ -107,9 +106,9 @@ func (c *FromPdataConverter) OutChannel() <-chan []*entry.Entry {
}

type fromConverterWorkerItem struct {
Resource pdata.Resource
LogRecordSlice pdata.LogRecordSlice
Scope pdata.ScopeLogs
Resource pcommon.Resource
LogRecordSlice plog.LogRecordSlice
Scope plog.ScopeLogs
}

// workerLoop is responsible for obtaining pdata logs from Batch() calls,
Expand All @@ -136,8 +135,8 @@ func (c *FromPdataConverter) workerLoop() {
}
}

// Batch takes in an set of pdata.Logs and sends it to an available worker for processing.
func (c *FromPdataConverter) Batch(pLogs pdata.Logs) error {
// Batch takes in an set of plog.Logs and sends it to an available worker for processing.
func (c *FromPdataConverter) Batch(pLogs plog.Logs) error {
for i := 0; i < pLogs.ResourceLogs().Len(); i++ {
rls := pLogs.ResourceLogs().At(i)
for j := 0; j < rls.ScopeLogs().Len(); j++ {
Expand Down Expand Up @@ -174,10 +173,10 @@ func convertFromLogs(workerItem fromConverterWorkerItem) []*entry.Entry {
return result
}

// ConvertFrom converts pdata.Logs into a slice of entry.Entry
// ConvertFrom converts plog.Logs into a slice of entry.Entry
// To be used in a stateless setting like tests where ease of use is more
// important than performance or throughput.
func ConvertFrom(pLogs pdata.Logs) []*entry.Entry {
func ConvertFrom(pLogs plog.Logs) []*entry.Entry {
result := make([]*entry.Entry, 0, pLogs.LogRecordCount())
for i := 0; i < pLogs.ResourceLogs().Len(); i++ {
rls := pLogs.ResourceLogs().At(i)
Expand All @@ -189,8 +188,8 @@ func ConvertFrom(pLogs pdata.Logs) []*entry.Entry {
return result
}

// convertFrom converts pdata.LogRecord into provided entry.Entry.
func convertFrom(src pdata.LogRecord, ent *entry.Entry) {
// convertFrom converts plog.LogRecord into provided entry.Entry.
func convertFrom(src plog.LogRecord, ent *entry.Entry) {
// if src.Timestamp == 0, then leave ent.Timestamp as nil
if src.Timestamp() != 0 {
ent.Timestamp = src.Timestamp().AsTime()
Expand Down Expand Up @@ -223,16 +222,16 @@ func convertFrom(src pdata.LogRecord, ent *entry.Entry) {
}
}

func valueToMap(value pdata.Map) map[string]interface{} {
func valueToMap(value pcommon.Map) map[string]interface{} {
rawMap := map[string]interface{}{}
value.Range(func(k string, v pdata.Value) bool {
value.Range(func(k string, v pcommon.Value) bool {
rawMap[k] = valueToInterface(v)
return true
})
return rawMap
}

func valueToInterface(value pdata.Value) interface{} {
func valueToInterface(value pcommon.Value) interface{} {
switch value.Type() {
case pcommon.ValueTypeEmpty:
return nil
Expand All @@ -259,7 +258,7 @@ func valueToInterface(value pdata.Value) interface{} {
}
}

var fromPdataSevMap = map[pdata.SeverityNumber]entry.Severity{
var fromPdataSevMap = map[plog.SeverityNumber]entry.Severity{
plog.SeverityNumberUNDEFINED: entry.Default,
plog.SeverityNumberTRACE: entry.Trace,
plog.SeverityNumberTRACE2: entry.Trace2,
Expand Down
17 changes: 8 additions & 9 deletions internal/stanza/frompdataconverter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

func BenchmarkConvertFromPdataSimple(b *testing.B) {
b.StopTimer()
pLogs := pdata.NewLogs()
pLogs := plog.NewLogs()
b.StartTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -47,7 +46,7 @@ func BenchmarkConvertFromPdataComplex(b *testing.B) {
}
}

func baseMap() pdata.Map {
func baseMap() pcommon.Map {
obj := pcommon.NewMap()
arr := pcommon.NewValueSlice()
arr.SliceVal().AppendEmpty().SetStringVal("666")
Expand All @@ -61,15 +60,15 @@ func baseMap() pdata.Map {
return obj
}

func baseMapValue() pdata.Value {
func baseMapValue() pcommon.Value {
v := pcommon.NewValueMap()
baseMap := baseMap()
baseMap.CopyTo(v.MapVal())
return v
}

func complexPdataForNDifferentHosts(count int, n int) pdata.Logs {
pLogs := pdata.NewLogs()
func complexPdataForNDifferentHosts(count int, n int) plog.Logs {
pLogs := plog.NewLogs()
logs := pLogs.ResourceLogs()

for i := 0; i < count; i++ {
Expand Down Expand Up @@ -125,7 +124,7 @@ func TestRoundTrip(t *testing.T) {
require.Equal(t, initialLogs, pLogs)
}

func sortComplexData(pLogs pdata.Logs) {
func sortComplexData(pLogs plog.Logs) {
pLogs.ResourceLogs().At(0).Resource().Attributes().Sort()
attrObject, _ := pLogs.ResourceLogs().At(0).Resource().Attributes().Get("object")
attrObject.MapVal().Sort()
Expand Down Expand Up @@ -222,7 +221,7 @@ func TestConvertFrom(t *testing.T) {
func TestConvertFromSeverity(t *testing.T) {
cases := []struct {
expectedSeverity entry.Severity
severityNumber pdata.SeverityNumber
severityNumber plog.SeverityNumber
}{
{entry.Default, plog.SeverityNumberUNDEFINED},
{entry.Trace, plog.SeverityNumberTRACE},
Expand Down Expand Up @@ -254,7 +253,7 @@ func TestConvertFromSeverity(t *testing.T) {
for _, tc := range cases {
t.Run(fmt.Sprintf("%v", tc.severityNumber), func(t *testing.T) {
entry := entry.New()
logRecord := pdata.NewLogRecord()
logRecord := plog.NewLogRecord()
logRecord.SetSeverityNumber(tc.severityNumber)
convertFrom(logRecord, entry)
require.Equal(t, tc.expectedSeverity, entry.Severity)
Expand Down
1 change: 0 additions & 1 deletion internal/stanza/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/open-telemetry/opentelemetry-log-collection v0.29.1
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/otel/metric v0.30.0
go.opentelemetry.io/otel/trace v1.7.0
Expand Down
2 changes: 0 additions & 2 deletions internal/stanza/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion processor/logstransformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/open-telemetry/opentelemetry-log-collection v0.29.1
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/zap v1.21.0
gonum.org/v1/gonum v0.11.0
Expand Down
2 changes: 0 additions & 2 deletions processor/logstransformprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions processor/logstransformprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/pipeline"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
"gonum.org/v1/gonum/graph/topo"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/stanza"
)

type outputType struct {
logs pdata.Logs
logs plog.Logs
err error
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func (ltp *logsTransformProcessor) Start(ctx context.Context, host component.Hos
return nil
}

func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (ltp *logsTransformProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) {
// Add the logs to the chain
err := ltp.fromConverter.Batch(ld)
if err != nil {
Expand Down
58 changes: 25 additions & 33 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

Expand Down Expand Up @@ -63,15 +62,15 @@ func parseTime(format, input string) *time.Time {
}

type testLogMessage struct {
body *pdata.Value
body pcommon.Value
time *time.Time
observedTime *time.Time
severity pdata.SeverityNumber
severity plog.SeverityNumber
severityText *string
spanID *pdata.SpanID
traceID *pdata.TraceID
spanID pcommon.SpanID
traceID pcommon.TraceID
flags uint32
attributes *map[string]pdata.Value
attributes *map[string]pcommon.Value
}

func TestLogsTransformProcessor(t *testing.T) {
Expand All @@ -91,47 +90,47 @@ func TestLogsTransformProcessor(t *testing.T) {
config: cfg,
sourceMessages: []testLogMessage{
{
body: &baseMessage,
spanID: &spanID,
traceID: &traceID,
body: baseMessage,
spanID: spanID,
traceID: traceID,
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
},
{
body: &baseMessage,
spanID: &spanID,
traceID: &traceID,
body: baseMessage,
spanID: spanID,
traceID: traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
},
},
parsedMessages: []testLogMessage{
{
body: &baseMessage,
body: baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
attributes: &map[string]pcommon.Value{
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
spanID: spanID,
traceID: traceID,
flags: uint32(0x01),
observedTime: parseTime("2006-01-02", "2022-01-02"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
},
{
body: &baseMessage,
body: baseMessage,
severity: plog.SeverityNumberINFO,
severityText: &infoSeverityText,
attributes: &map[string]pdata.Value{
attributes: &map[string]pcommon.Value{
"msg": pcommon.NewValueString("this is a test message"),
"time": pcommon.NewValueString("2022-01-01 01:02:03"),
"sev": pcommon.NewValueString("INFO"),
},
spanID: &spanID,
traceID: &traceID,
spanID: spanID,
traceID: traceID,
flags: uint32(0x02),
observedTime: parseTime("2006-01-02", "2022-01-03"),
time: parseTime("2006-01-02 15:04:05", "2022-01-01 01:02:03"),
Expand Down Expand Up @@ -166,19 +165,17 @@ func TestLogsTransformProcessor(t *testing.T) {
}
}

func generateLogData(messages []testLogMessage) pdata.Logs {
func generateLogData(messages []testLogMessage) plog.Logs {
ld := testdata.GenerateLogsOneEmptyResourceLogs()
scope := ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty()
for _, content := range messages {
log := scope.LogRecords().AppendEmpty()
if content.body != nil {
content.body.CopyTo(log.Body())
}
content.body.CopyTo(log.Body())
if content.time != nil {
log.SetTimestamp(pdata.NewTimestampFromTime(*content.time))
log.SetTimestamp(pcommon.NewTimestampFromTime(*content.time))
}
if content.observedTime != nil {
log.SetObservedTimestamp(pdata.NewTimestampFromTime(*content.observedTime))
log.SetObservedTimestamp(pcommon.NewTimestampFromTime(*content.observedTime))
}
if content.severity != 0 {
log.SetSeverityNumber(content.severity)
Expand All @@ -193,13 +190,8 @@ func generateLogData(messages []testLogMessage) pdata.Logs {
log.Attributes().Sort()
}

if content.spanID != nil {
log.SetSpanID(*content.spanID)
}

if content.traceID != nil {
log.SetTraceID(*content.traceID)
}
log.SetSpanID(content.spanID)
log.SetTraceID(content.traceID)

if content.flags != uint32(0x00) {
log.SetFlags(content.flags)
Expand Down
1 change: 0 additions & 1 deletion receiver/filelogreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.50.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions receiver/filelogreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion receiver/journaldreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/collector/model v0.50.0 // indirect
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions receiver/journaldreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6f29012

Please sign in to comment.