Skip to content

Commit

Permalink
Allow partial retries in queued retry processor (open-telemetry#1297)
Browse files Browse the repository at this point in the history
* Allow partial retries in queued retry processor

Signed-off-by: Pavol Loffay <[email protected]>

* Rename consumererror file

Signed-off-by: Pavol Loffay <[email protected]>

* Fix data race

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Jul 10, 2020
1 parent bfc0dfe commit d8c017a
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 2 deletions.
38 changes: 38 additions & 0 deletions consumer/consumererror/partialerror.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumererror

import "go.opentelemetry.io/collector/consumer/pdata"

// PartialError can be used to signalize that a subset of received data failed to be processed or send.
// The preceding components in the pipeline can use this information for partial retries.
type PartialError struct {
error
failed pdata.Traces
}

// PartialTracesError creates PartialError for failed traces.
// Use this error type only when a subset of received data set failed to be processed or sent.
func PartialTracesError(err error, failed pdata.Traces) error {
return PartialError{
error: err,
failed: failed,
}
}

// GetTraces returns failed traces.
func (err PartialError) GetTraces() pdata.Traces {
return err.failed
}
32 changes: 32 additions & 0 deletions consumer/consumererror/partialerror_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumererror

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/internal/data/testdata"
)

func TestPartialError(t *testing.T) {
td := testdata.GenerateTraceDataOneSpan()
err := fmt.Errorf("some error")
partialErr := PartialTracesError(err, td)
assert.Equal(t, err.Error(), partialErr.Error())
assert.Equal(t, td, partialErr.(PartialError).failed)
}
File renamed without changes.
5 changes: 4 additions & 1 deletion processor/queuedprocessor/queued_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (sp *queuedSpanProcessor) ConsumeTraces(ctx context.Context, td pdata.Trace
// record this as "refused" instead of "dropped".
sp.onItemDropped(item, fmt.Errorf("failed to add to the queue: %w", error(nil)))
} else {
obsreport.ProcessorTraceDataAccepted(ctx, item.spanCountStats.GetAllSpansCount())
obsreport.ProcessorTraceDataAccepted(ctx, td.SpanCount())
}
return nil
}
Expand Down Expand Up @@ -174,6 +174,9 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {
// throw away the batch
sp.onItemDropped(item, err)
return
} else if partialErr, isPartial := err.(consumererror.PartialError); isPartial {
item.td = partialErr.GetTraces()
item.spanCountStats = processor.NewSpanCountStats(item.td, sp.name)
}

// Immediately drop data on no retires configured.
Expand Down
35 changes: 34 additions & 1 deletion processor/queuedprocessor/queued_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,51 @@ func TestQueuedProcessor_noEnqueueOnPermanentError(t *testing.T) {
require.Equal(t, 1, qp.queue.Size())
}

func TestQueueProcessorPartialError(t *testing.T) {
partialErr := consumererror.PartialTracesError(fmt.Errorf("some error"), testdata.GenerateTraceDataTwoSpansSameResource())
td := testdata.GenerateTraceDataTwoSpansSameResource()
c := &waitGroupTraceConsumer{
consumeTraceDataError: partialErr,
}

cfg := generateDefaultConfig()
cfg.NumWorkers = 1
cfg.QueueSize = 2
cfg.RetryOnFailure = true
cfg.BackoffDelay = time.Millisecond * 50
qp := newQueuedSpanProcessor(component.ProcessorCreateParams{Logger: zap.NewNop()}, c, cfg)
require.NoError(t, qp.Start(context.Background(), componenttest.NewNopHost()))

c.Add(2)
require.Nil(t, qp.ConsumeTraces(context.Background(), td))
c.Wait()
exporterData := c.getData()
assert.Equal(t, 2, len(exporterData))
assert.Equal(t, td, exporterData[0])
assert.Equal(t, partialErr.(consumererror.PartialError).GetTraces(), exporterData[1])
}

type waitGroupTraceConsumer struct {
sync.WaitGroup
consumeTraceDataError error
mux sync.Mutex
data []pdata.Traces
}

var _ consumer.TraceConsumer = (*waitGroupTraceConsumer)(nil)

func (c *waitGroupTraceConsumer) ConsumeTraces(_ context.Context, _ pdata.Traces) error {
func (c *waitGroupTraceConsumer) ConsumeTraces(_ context.Context, data pdata.Traces) error {
c.mux.Lock()
defer c.mux.Unlock()
defer c.Done()
c.data = append(c.data, data)
return c.consumeTraceDataError
}

func (c *waitGroupTraceConsumer) getData() []pdata.Traces {
return c.data
}

func (c *waitGroupTraceConsumer) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: false}
}
Expand Down

0 comments on commit d8c017a

Please sign in to comment.