Skip to content

Commit

Permalink
receiver/azureblobreceiver Second PR (open-telemetry#12418)
Browse files Browse the repository at this point in the history
* blob receiver

* blobclient

* working receiver

* traces receiver

* two receivers

* blob

* sync

* blobclient

* gomod

* linting

* unit tests

* logger

* handler

* unit tests

* sync

* unit test

* changelog

* lintiing

* Renew the PR

* add status to readme

* add change to unreleased

* changelog

* fix build

* linting

* linting

* linting

* linting

* unit test

* comments

* comments

* comments

* comments

* endpoint

* lowercase factory method

* docs in go struct

* codeowners

* versions

* codeowners

* cleaning

* go mod tidy

* tests

* versions

* public to private

* Update receiver/azureblobreceiver/blobclient.go

Co-authored-by: Pablo Baeyens <[email protected]>

* Update receiver/azureblobreceiver/receiver.go

Co-authored-by: Pablo Baeyens <[email protected]>

* Update receiver/azureblobreceiver/receiver.go

Co-authored-by: Pablo Baeyens <[email protected]>

* comment

* merge conflicts

* changelog

* fix tests

* Update receiver/azureblobreceiver/blobeventhandler.go

Co-authored-by: Pablo Baeyens <[email protected]>

* Update receiver/azureblobreceiver/factory.go

Co-authored-by: Pablo Baeyens <[email protected]>

* Update receiver/azureblobreceiver/factory.go

Co-authored-by: Pablo Baeyens <[email protected]>

* address comments

* address comments

* address comment

* go sum

* dependencies

Co-authored-by: Pablo Baeyens <[email protected]>
Co-authored-by: Pablo Baeyens <[email protected]>
  • Loading branch information
3 people committed Oct 13, 2022
1 parent 8f8a634 commit 26a6516
Show file tree
Hide file tree
Showing 20 changed files with 996 additions and 17 deletions.
16 changes: 16 additions & 0 deletions .chloggen/azure-blob-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: azureblobreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation of the component (second PR)

# One or more tracking issues related to the change
issues: [8834]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 0 additions & 4 deletions receiver/azureblobreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,3 @@ receivers:
```

The receiver subscribes [on the events](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-event-overview) published by Azure Blob Storage and handled by Azure Event Hub. When it receives `Blob Create` event, it reads the logs or traces from a corresponding blob and deletes it after processing.




75 changes: 75 additions & 0 deletions receiver/azureblobreceiver/blobclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"

import (
"bytes"
"context"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"go.uber.org/zap"
)

type blobClient interface {
readBlob(ctx context.Context, containerName string, blobName string) (*bytes.Buffer, error)
}

type azureBlobClient struct {
serviceClient *azblob.ServiceClient
logger *zap.Logger
}

var _ blobClient = (*azureBlobClient)(nil)

func (bc *azureBlobClient) getBlockBlob(containerName string, blobName string) azblob.BlockBlobClient {
containerClient := bc.serviceClient.NewContainerClient(containerName)

return containerClient.NewBlockBlobClient(blobName)
}

func (bc *azureBlobClient) readBlob(ctx context.Context, containerName string, blobName string) (*bytes.Buffer, error) {
blockBlob := bc.getBlockBlob(containerName, blobName)
defer func() {
_, blobDeleteErr := blockBlob.Delete(ctx, nil)
if blobDeleteErr != nil {
bc.logger.Error("failed to delete blob", zap.Error(blobDeleteErr))
}
}()

get, err := blockBlob.Download(ctx, nil)
if err != nil {
return nil, err
}

downloadedData := &bytes.Buffer{}
reader := get.Body(nil)
defer reader.Close()

_, err = downloadedData.ReadFrom(reader)

return downloadedData, err
}

func newBlobClient(connectionString string, logger *zap.Logger) (*azureBlobClient, error) {
serviceClient, err := azblob.NewServiceClientFromConnectionString(connectionString, nil)
if err != nil {
return nil, err
}

return &azureBlobClient{
&serviceClient,
logger,
}, nil
}
43 changes: 43 additions & 0 deletions receiver/azureblobreceiver/blobclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

const (
goodConnectionString = "DefaultEndpointsProtocol=https;AccountName=accountName;AccountKey=+idLkHYcL0MUWIKYHm2j4Q==;EndpointSuffix=core.windows.net"
badConnectionString = "DefaultEndpointsProtocol=https;AccountName=accountName;AccountKey=accountkey;EndpointSuffix=core.windows.net"
)

func TestNewBlobClient(t *testing.T) {
blobClient, err := newBlobClient(goodConnectionString, zaptest.NewLogger(t))

require.NoError(t, err)
require.NotNil(t, blobClient)
assert.NotNil(t, blobClient.serviceClient)
}

func TestNewBlobClientError(t *testing.T) {
blobClient, err := newBlobClient(badConnectionString, zaptest.NewLogger(t))

assert.Error(t, err)
assert.Nil(t, blobClient)
}
153 changes: 153 additions & 0 deletions receiver/azureblobreceiver/blobeventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"

import (
"context"
"encoding/json"
"strings"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"go.uber.org/zap"
)

type blobEventHandler interface {
run(ctx context.Context) error
close(ctx context.Context) error
setLogsDataConsumer(logsDataConsumer logsDataConsumer)
setTracesDataConsumer(tracesDataConsumer tracesDataConsumer)
}

type azureBlobEventHandler struct {
blobClient blobClient
logsDataConsumer logsDataConsumer
tracesDataConsumer tracesDataConsumer
logsContainerName string
tracesContainerName string
eventHubSonnectionString string
hub *eventhub.Hub
logger *zap.Logger
}

var _ blobEventHandler = (*azureBlobEventHandler)(nil)

const (
blobCreatedEventType = "Microsoft.Storage.BlobCreated"
)

func (p *azureBlobEventHandler) run(ctx context.Context) error {

if p.hub != nil {
return nil
}

hub, err := eventhub.NewHubFromConnectionString(p.eventHubSonnectionString)
if err != nil {
return err
}

p.hub = hub

runtimeInfo, err := hub.GetRuntimeInformation(ctx)
if err != nil {
return err
}

for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := hub.Receive(ctx, partitionID, p.newMessageHandler, eventhub.ReceiveWithLatestOffset())
if err != nil {
return err
}
}

return nil
}

func (p *azureBlobEventHandler) newMessageHandler(ctx context.Context, event *eventhub.Event) error {

type eventData struct {
Topic string
Subject string
EventType string
ID string
Data map[string]interface{}
DataVersion string
MetadataVersion string
EsventTime string
}
var eventDataSlice []eventData
marshalErr := json.Unmarshal(event.Data, &eventDataSlice)
if marshalErr != nil {
return marshalErr
}
subject := eventDataSlice[0].Subject
containerName := strings.Split(strings.Split(subject, "containers/")[1], "/")[0]
eventType := eventDataSlice[0].EventType
blobName := strings.Split(subject, "blobs/")[1]

if eventType == blobCreatedEventType {
blobData, err := p.blobClient.readBlob(ctx, containerName, blobName)

if err != nil {
return err
}
switch {
case containerName == p.logsContainerName:
err = p.logsDataConsumer.consumeLogsJSON(ctx, blobData.Bytes())
if err != nil {
return err
}
case containerName == p.tracesContainerName:
err = p.tracesDataConsumer.consumeTracesJSON(ctx, blobData.Bytes())
if err != nil {
return err
}
default:
p.logger.Debug("Unknown container name", zap.String("containerName", containerName))
}
}

return nil
}

func (p *azureBlobEventHandler) close(ctx context.Context) error {

if p.hub != nil {
err := p.hub.Close(ctx)
if err != nil {
return err
}
p.hub = nil
}
return nil
}

func (p *azureBlobEventHandler) setLogsDataConsumer(logsDataConsumer logsDataConsumer) {
p.logsDataConsumer = logsDataConsumer
}

func (p *azureBlobEventHandler) setTracesDataConsumer(tracesDataConsumer tracesDataConsumer) {
p.tracesDataConsumer = tracesDataConsumer
}

func newBlobEventHandler(eventHubSonnectionString string, logsContainerName string, tracesContainerName string, blobClient blobClient, logger *zap.Logger) *azureBlobEventHandler {
return &azureBlobEventHandler{
blobClient: blobClient,
logsContainerName: logsContainerName,
tracesContainerName: tracesContainerName,
eventHubSonnectionString: eventHubSonnectionString,
logger: logger,
}
}
74 changes: 74 additions & 0 deletions receiver/azureblobreceiver/blobeventhandler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package azureblobreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azureblobreceiver"

import (
"context"
"testing"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

const (
eventHubString = "Endpoint=sb:https://oteldata.servicebus.windows.net/;SharedAccessKeyName=oteldatahubpolicy;SharedAccessKey=sharedAccessKey;EntityPath=otelddatahub"
)

var (
logEventData = []byte(`[{"topic":"someTopic","subject":"/blobServices/default/containers/logs/blobs/logs-1","eventType":"Microsoft.Storage.BlobCreated","id":"1","data":{"api":"PutBlob","clientRequestId":"1","requestId":"1","eTag":"1","contentType":"text","contentLength":10,"blobType":"BlockBlob","url":"https://oteldata.blob.core.windows.net/logs/logs-1","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1","eventTime":"2022-03-25T15:59:50.9251748Z"}]`)
traceEventData = []byte(`[{"topic":"someTopic","subject":"/blobServices/default/containers/traces/blobs/traces-1","eventType":"Microsoft.Storage.BlobCreated","id":"1","data":{"api":"PutBlob","clientRequestId":"1","requestId":"1","eTag":"1","contentType":"text","contentLength":10,"blobType":"BlockBlob","url":"https://oteldata.blob.core.windows.net/traces/traces-1","sequencer":"1","storageDiagnostics":{"batchId":"1"}},"dataVersion":"","metadataVersion":"1","eventTime":"2022-03-25T15:59:50.9251748Z"}]`)
)

func TestNewBlobEventHandler(t *testing.T) {
blobClient := newMockBlobClient()
blobEventHandler := getBlobEventHandler(t, blobClient)

require.NotNil(t, blobEventHandler)
assert.Equal(t, blobClient, blobEventHandler.blobClient)
}

func TestNewMessageHangdler(t *testing.T) {
blobClient := newMockBlobClient()
blobEventHandler := getBlobEventHandler(t, blobClient)

logsDataConsumer := newMockLogsDataConsumer()
tracesDataConsumer := newMockTracesDataConsumer()
blobEventHandler.setLogsDataConsumer(logsDataConsumer)
blobEventHandler.setTracesDataConsumer(tracesDataConsumer)

logEvent := getEvent(logEventData)
err := blobEventHandler.newMessageHandler(context.Background(), logEvent)
require.NoError(t, err)

traceEvent := getEvent(traceEventData)
err = blobEventHandler.newMessageHandler(context.Background(), traceEvent)
require.NoError(t, err)

logsDataConsumer.AssertNumberOfCalls(t, "consumeLogsJSON", 1)
tracesDataConsumer.AssertNumberOfCalls(t, "consumeTracesJSON", 1)
blobClient.AssertNumberOfCalls(t, "readBlob", 2)

}

func getEvent(eventData []byte) *eventhub.Event {
return &eventhub.Event{Data: eventData}
}

func getBlobEventHandler(tb testing.TB, blobClient blobClient) *azureBlobEventHandler {
blobEventHandler := newBlobEventHandler(eventHubString, logsContainerName, tracesContainerName, blobClient, zaptest.NewLogger(tb))
return blobEventHandler
}
Loading

0 comments on commit 26a6516

Please sign in to comment.