Skip to content

Commit

Permalink
[receiver/skywalking]: Refactor code structure/directory (open-teleme…
Browse files Browse the repository at this point in the history
…try#20344)

1st PR: In order to add metrics receiver to skywalking/receiver, I found that I need to adjust the code directory/code structure a bit. So this PR is just a code refactoring, referring to the structure of otlp/receiver. After that, it will be easier for skywalking/receiver to add metrics or logs to the receiver.
  • Loading branch information
aheling11 committed May 19, 2023
1 parent 055d90b commit ad466f0
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 166 deletions.
16 changes: 16 additions & 0 deletions .chloggen/skywalking_add_metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: skywalkingreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Refactoring the code structure/directory for the following metrics receiver implementation

# One or more tracking issues related to the change
issues: [20315]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
25 changes: 22 additions & 3 deletions receiver/skywalkingreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata"
)

Expand Down Expand Up @@ -68,6 +69,24 @@ func createTracesReceiver(
// that Skywalking receiver understands.
rCfg := cfg.(*Config)

c, err := createConfiguration(rCfg)
if err != nil {
return nil, err
}

r := receivers.GetOrAdd(cfg, func() component.Component {
return newSkywalkingReceiver(c, set)
})

if err = r.Unwrap().(*swReceiver).registerTraceConsumer(nextConsumer); err != nil {
return nil, err
}

return r, nil
}

// create the config that Skywalking receiver will use.
func createConfiguration(rCfg *Config) (*configuration, error) {
var err error
var c configuration
// Set ports
Expand All @@ -84,9 +103,7 @@ func createTracesReceiver(
return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
}
}

// Create the receiver.
return newSkywalkingReceiver(&c, nextConsumer, set)
return &c, nil
}

// extract the port number from string in "address:port" format. If the
Expand All @@ -105,3 +122,5 @@ func extractPortFromEndpoint(endpoint string) (int, error) {
}
return int(port), nil
}

var receivers = sharedcomponent.NewSharedComponents()
30 changes: 18 additions & 12 deletions receiver/skywalkingreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/metadata"
)

Expand All @@ -45,8 +47,9 @@ func TestCreateReceiver(t *testing.T) {
Transport: "tcp",
},
}
traceSink := new(consumertest.TracesSink)
set := receivertest.NewNopCreateSettings()
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

Expand All @@ -66,7 +69,8 @@ func TestCreateReceiverGeneralConfig(t *testing.T) {
require.NoError(t, component.UnmarshalConfig(sub, cfg))

set := receivertest.NewNopCreateSettings()
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
traceSink := new(consumertest.TracesSink)
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

Expand All @@ -85,11 +89,12 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) {
Transport: "tcp",
},
}
traceSink := new(consumertest.TracesSink)
set := receivertest.NewNopCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, 11800, r.(*swReceiver).config.CollectorGRPCPort, "grpc port should be default")
assert.Equal(t, 11800, r.(*sharedcomponent.SharedComponent).
Unwrap().(*swReceiver).config.CollectorGRPCPort, "grpc port should be default")
}

func TestCreateTLSGPRCEndpoint(t *testing.T) {
Expand All @@ -109,8 +114,8 @@ func TestCreateTLSGPRCEndpoint(t *testing.T) {
},
}
set := receivertest.NewNopCreateSettings()

_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
traceSink := new(consumertest.TracesSink)
_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "tls-enabled receiver creation failed")
}

Expand All @@ -129,8 +134,8 @@ func TestCreateTLSHTTPEndpoint(t *testing.T) {
}

set := receivertest.NewNopCreateSettings()

_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
traceSink := new(consumertest.TracesSink)
_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "tls-enabled receiver creation failed")
}

Expand All @@ -142,8 +147,9 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) {
Endpoint: defaultHTTPBindEndpoint,
}
set := receivertest.NewNopCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

traceSink := new(consumertest.TracesSink)
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, 12800, r.(*swReceiver).config.CollectorHTTPPort, "http port should be default")
assert.Equal(t, 12800, r.(*sharedcomponent.SharedComponent).
Unwrap().(*swReceiver).config.CollectorHTTPPort, "http port should be default")
}
3 changes: 3 additions & 0 deletions receiver/skywalkingreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.2
go.opentelemetry.io/collector v0.77.0
go.opentelemetry.io/collector/component v0.77.0
Expand Down Expand Up @@ -56,6 +57,8 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

retract (
v0.76.2
v0.76.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
package trace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace"

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package skywalkingreceiver
package trace

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
common "skywalking.apache.org/repo/goapi/collect/common/v3"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

Expand Down Expand Up @@ -305,3 +308,88 @@ func generateTracesOneEmptyResourceSpans() ptrace.Span {
il.Spans().AppendEmpty()
return il.Spans().At(0)
}

func mockGrpcTraceSegment(sequence int) *agent.SegmentObject {
seq := strconv.Itoa(sequence)
return &agent.SegmentObject{
TraceId: "trace" + seq,
TraceSegmentId: "trace-segment" + seq,
Service: "demo-segmentReportService" + seq,
ServiceInstance: "demo-instance" + seq,
IsSizeLimited: false,
Spans: []*agent.SpanObject{
{
SpanId: 1,
ParentSpanId: 0,
StartTime: time.Now().Unix(),
EndTime: time.Now().Unix() + 10,
OperationName: "operation" + seq,
Peer: "127.0.0.1:6666",
SpanType: agent.SpanType_Entry,
SpanLayer: agent.SpanLayer_Http,
ComponentId: 1,
IsError: false,
SkipAnalysis: false,
Tags: []*common.KeyStringValuePair{
{
Key: "mock-key" + seq,
Value: "mock-value" + seq,
},
},
Logs: []*agent.Log{
{
Time: time.Now().Unix(),
Data: []*common.KeyStringValuePair{
{
Key: "log-key" + seq,
Value: "log-value" + seq,
},
},
},
},
Refs: []*agent.SegmentReference{
{
RefType: agent.RefType_CrossThread,
TraceId: "trace" + seq,
ParentTraceSegmentId: "parent-trace-segment" + seq,
ParentSpanId: 0,
ParentService: "parent" + seq,
ParentServiceInstance: "parent" + seq,
ParentEndpoint: "parent" + seq,
NetworkAddressUsedAtPeer: "127.0.0.1:6666",
},
},
},
{
SpanId: 2,
ParentSpanId: 1,
StartTime: time.Now().Unix(),
EndTime: time.Now().Unix() + 20,
OperationName: "operation" + seq,
Peer: "127.0.0.1:6666",
SpanType: agent.SpanType_Local,
SpanLayer: agent.SpanLayer_Http,
ComponentId: 2,
IsError: false,
SkipAnalysis: false,
Tags: []*common.KeyStringValuePair{
{
Key: "mock-key" + seq,
Value: "mock-value" + seq,
},
},
Logs: []*agent.Log{
{
Time: time.Now().Unix(),
Data: []*common.KeyStringValuePair{
{
Key: "log-key" + seq,
Value: "log-value" + seq,
},
},
},
},
},
},
}
}
Loading

0 comments on commit ad466f0

Please sign in to comment.