From 8687e814703e220c9b79fc6eb19a4e3c191539f3 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 15 Dec 2022 13:26:45 -0500 Subject: [PATCH 1/7] wip: implement fnv hash in probabilistic sampler processor --- .../fnvhasher.go | 37 +++++++++++++++++++ .../logsprocessor.go | 6 ++- 2 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 processor/probabilisticsamplerprocessor/fnvhasher.go diff --git a/processor/probabilisticsamplerprocessor/fnvhasher.go b/processor/probabilisticsamplerprocessor/fnvhasher.go new file mode 100644 index 0000000000000..657f81ccddd13 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/fnvhasher.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor" + +import ( + "hash/fnv" +) + +// computeHash creates a hash using the FNV-1a algorithm +func computeHash(b []byte, seed uint32) uint32 { + hash := fnv.New32a() + // the implementation fnv.Write() does not return an error, see hash/fnv/fnv.go + _, _ = hash.Write(i32tob(seed)) + _, _ = hash.Write(b) + return hash.Sum32() +} + +// i32tob converts a seed to a byte array to be used as part of fnv.Write() +func i32tob(val uint32) []byte { + r := make([]byte, 4) + for i := uint32(0); i < 4; i++ { + r[i] = byte((val >> (8 * i)) & 0xff) + } + return r +} diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index 4eba942adfef6..a2f46a5ebd87d 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -89,7 +89,11 @@ func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) ( } } - sampled := hash(lidBytes, lsp.hashSeed)&bitMaskHashBuckets < priority + //hash1 := hash(lidBytes, lsp.hashSeed) + hash2 := computeHash(lidBytes, lsp.hashSeed) + + //sampled := hash1&bitMaskHashBuckets < priority + sampled := hash2&bitMaskHashBuckets < priority var err error if sampled { err = stats.RecordWithTags( From c705c7f17af9f48c076e3f30a7ed2ccded0d9810 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Thu, 15 Dec 2022 17:05:51 -0500 Subject: [PATCH 2/7] wip: more updates --- .../logsprocessor.go | 26 +++++-------------- .../logsprocessor_test.go | 13 ++++++++-- .../tracesprocessor.go | 23 +++++++--------- 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index a2f46a5ebd87d..47424200bfb2e 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -16,6 +16,7 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" + "strconv" "go.opencensus.io/stats" "go.opencensus.io/tag" @@ -89,25 +90,12 @@ func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) ( } } - //hash1 := hash(lidBytes, lsp.hashSeed) - hash2 := computeHash(lidBytes, lsp.hashSeed) - - //sampled := hash1&bitMaskHashBuckets < priority - sampled := hash2&bitMaskHashBuckets < priority - var err error - if sampled { - err = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, tagPolicyValue), tag.Upsert(tagSampledKey, "true")}, - statCountLogsSampled.M(int64(1)), - ) - } else { - err = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, tagPolicyValue), tag.Upsert(tagSampledKey, "false")}, - statCountLogsSampled.M(int64(1)), - ) - } + sampled := computeHash(lidBytes, lsp.hashSeed)&bitMaskHashBuckets < priority + var err error = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(tagPolicyKey, tagPolicyValue), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, + statCountLogsSampled.M(int64(1)), + ) if err != nil { lsp.logger.Error(err.Error()) } diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index 16d3f832f03cb..c5ff645039e39 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -136,7 +136,7 @@ func TestLogsSampling(t *testing.T) { AttributeSource: recordAttributeSource, FromAttribute: "foo", }, - received: 79, + received: 23, }, { name: "sampling_priority", @@ -190,7 +190,16 @@ func TestLogsSampling(t *testing.T) { if len(sunk) > 0 && sunk[0].ResourceLogs().Len() > 0 { numReceived = sunk[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() } - assert.Equal(t, tt.received, numReceived) + //assert.Equal(t, tt.received, numReceived) + assert.True(t, Abs(tt.received-numReceived) < 10) }) } } + +// Abs returns the absolute value of x. +func Abs(x int) int { + if x < 0 { + return -x + } + return x +} diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor.go b/processor/probabilisticsamplerprocessor/tracesprocessor.go index 30e5bea44625b..1acbbf6de3abd 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor.go @@ -47,6 +47,9 @@ const ( doNotSampleSpan // The constants help translate user friendly percentages to numbers direct used in sampling. + //numHashBuckets = 0x4000 // Using a power of 2 to avoid division. + //numHashBuckets = math.MaxInt32 + //bitMaskHashBuckets = math.MaxInt32 numHashBuckets = 0x4000 // Using a power of 2 to avoid division. bitMaskHashBuckets = numHashBuckets - 1 percentageScaleFactor = numHashBuckets / 100.0 @@ -105,21 +108,13 @@ func (tsp *traceSamplerProcessor) processTraces(ctx context.Context, td ptrace.T // Hashing here prevents bias due to such systems. tidBytes := s.TraceID() sampled := sp == mustSampleSpan || - hash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < tsp.scaledSamplingRate + computeHash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < tsp.scaledSamplingRate - if sampled { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "trace_id_hash"), tag.Upsert(tagSampledKey, "true")}, - statCountTracesSampled.M(int64(1)), - ) - } else { - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "trace_id_hash"), tag.Upsert(tagSampledKey, "false")}, - statCountTracesSampled.M(int64(1)), - ) - } + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(tagPolicyKey, "trace_id_hash"), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, + statCountTracesSampled.M(int64(1)), + ) return !sampled }) // Filter out empty ScopeMetrics From 60ba7cf63edb558b1abf2670f9e9f53137b68c86 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 6 Jan 2023 15:05:06 -0500 Subject: [PATCH 3/7] wip --- processor/probabilisticsamplerprocessor/tracesprocessor.go | 3 --- .../probabilisticsamplerprocessor/tracesprocessor_test.go | 2 +- .../tailsamplingprocessor/internal/sampling/probabilistic.go | 1 + 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor.go b/processor/probabilisticsamplerprocessor/tracesprocessor.go index 1acbbf6de3abd..40469ba58c8ca 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor.go @@ -47,9 +47,6 @@ const ( doNotSampleSpan // The constants help translate user friendly percentages to numbers direct used in sampling. - //numHashBuckets = 0x4000 // Using a power of 2 to avoid division. - //numHashBuckets = math.MaxInt32 - //bitMaskHashBuckets = math.MaxInt32 numHashBuckets = 0x4000 // Using a power of 2 to avoid division. bitMaskHashBuckets = numHashBuckets - 1 percentageScaleFactor = numHashBuckets / 100.0 diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go index d32bb401e0b1f..32500d6a3a132 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go @@ -106,7 +106,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { ProcessorSettings: config.NewProcessorSettings(component.NewID(typeStr)), SamplingPercentage: 5, }, - numBatches: 1e5, + numBatches: 1e6, numTracesPerBatch: 2, acceptableDelta: 0.01, }, diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go index 951a0e655df60..141f32ed78e8e 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go @@ -68,6 +68,7 @@ func calculateThreshold(ratio float64) uint64 { // doesn't fit into bits that are used to store digits for float64 in Golang boundary := new(big.Float).SetInt(new(big.Int).SetUint64(math.MaxUint64)) res, _ := boundary.Mul(boundary, big.NewFloat(ratio)).Uint64() + return res } From c433819ab4652172e3c2964344eeb8c12cd1eded Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 17 Jan 2023 14:21:46 -0500 Subject: [PATCH 4/7] Clean up tests. Revert unintended change. --- .../logsprocessor_test.go | 13 ++----------- .../tracesprocessor_test.go | 2 +- .../internal/sampling/probabilistic.go | 1 - 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index 8c536101e5708..fa1dabcbe3fc5 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -98,7 +98,7 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: traceIDAttributeSource, }, - received: 52, + received: 45, // 52 }, { name: "sampling_source no sampling", @@ -177,16 +177,7 @@ func TestLogsSampling(t *testing.T) { if len(sunk) > 0 && sunk[0].ResourceLogs().Len() > 0 { numReceived = sunk[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len() } - //assert.Equal(t, tt.received, numReceived) - assert.True(t, Abs(tt.received-numReceived) < 10) + assert.Equal(t, tt.received, numReceived) }) } } - -// Abs returns the absolute value of x. -func Abs(x int) int { - if x < 0 { - return -x - } - return x -} diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go index 56fe0780dc7ca..afabc5fc45cfc 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go @@ -99,7 +99,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { cfg: &Config{ SamplingPercentage: 5, }, - numBatches: 1e6, + numBatches: 1e5, numTracesPerBatch: 2, acceptableDelta: 0.01, }, diff --git a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go index 141f32ed78e8e..951a0e655df60 100644 --- a/processor/tailsamplingprocessor/internal/sampling/probabilistic.go +++ b/processor/tailsamplingprocessor/internal/sampling/probabilistic.go @@ -68,7 +68,6 @@ func calculateThreshold(ratio float64) uint64 { // doesn't fit into bits that are used to store digits for float64 in Golang boundary := new(big.Float).SetInt(new(big.Int).SetUint64(math.MaxUint64)) res, _ := boundary.Mul(boundary, big.NewFloat(ratio)).Uint64() - return res } From 0b7e5a39373b274b982e48d7ffa565743e5f2f17 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Tue, 17 Jan 2023 14:28:20 -0500 Subject: [PATCH 5/7] Remove the old hash implementation. --- .../probabilisticsamplerprocessor/murmur3.go | 69 ------------------- .../murmur3_test.go | 41 ----------- 2 files changed, 110 deletions(-) delete mode 100644 processor/probabilisticsamplerprocessor/murmur3.go delete mode 100644 processor/probabilisticsamplerprocessor/murmur3_test.go diff --git a/processor/probabilisticsamplerprocessor/murmur3.go b/processor/probabilisticsamplerprocessor/murmur3.go deleted file mode 100644 index 273a7a92bae76..0000000000000 --- a/processor/probabilisticsamplerprocessor/murmur3.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor" - -// hash is a murmur3 hash function, see http://en.wikipedia.org/wiki/MurmurHash -func hash(key []byte, seed uint32) (hash uint32) { - const ( - c1 = 0xcc9e2d51 - c2 = 0x1b873593 - c3 = 0x85ebca6b - c4 = 0xc2b2ae35 - r1 = 15 - r2 = 13 - m = 5 - n = 0xe6546b64 - ) - - hash = seed - iByte := 0 - for ; iByte+4 <= len(key); iByte += 4 { - k := uint32(key[iByte]) | uint32(key[iByte+1])<<8 | uint32(key[iByte+2])<<16 | uint32(key[iByte+3])<<24 - k *= c1 - k = (k << r1) | (k >> (32 - r1)) - k *= c2 - hash ^= k - hash = (hash << r2) | (hash >> (32 - r2)) - hash = hash*m + n - } - - // TraceId and SpanId have lengths that are multiple of 4 so the code below is never expected to - // be hit when sampling traces. However, it is preserved here to keep it as a correct murmur3 implementation. - // This is enforced via tests. - var remainingBytes uint32 - switch len(key) - iByte { - case 3: - remainingBytes += uint32(key[iByte+2]) << 16 - fallthrough - case 2: - remainingBytes += uint32(key[iByte+1]) << 8 - fallthrough - case 1: - remainingBytes += uint32(key[iByte]) - remainingBytes *= c1 - remainingBytes = (remainingBytes << r1) | (remainingBytes >> (32 - r1)) - remainingBytes *= c2 - hash ^= remainingBytes - } - - hash ^= uint32(len(key)) - hash ^= hash >> 16 - hash *= c3 - hash ^= hash >> 13 - hash *= c4 - hash ^= hash >> 16 - - return -} diff --git a/processor/probabilisticsamplerprocessor/murmur3_test.go b/processor/probabilisticsamplerprocessor/murmur3_test.go deleted file mode 100644 index 87efdae9b43fe..0000000000000 --- a/processor/probabilisticsamplerprocessor/murmur3_test.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package probabilisticsamplerprocessor - -import ( - "math/rand" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils" -) - -// TestHashHasNoCollisions ensures that the hash function supports different key lengths even if in -// practice it is only expected to receive keys with length 16 (trace id length in OC proto). -func TestHashHasNoCollisions(t *testing.T) { - // Statistically a random selection of such small number of keys should not result in - // collisions, but, of course it is possible that they happen, a different random source - // should avoid that. - r := rand.New(rand.NewSource(1)) - fullKey := idutils.UInt64ToTraceID(r.Uint64(), r.Uint64()) - seen := make(map[uint32]bool) - for i := 1; i <= len(fullKey); i++ { - key := fullKey[:i] - hash := hash(key, 1) - require.False(t, seen[hash], "Unexpected duplicated hash") - seen[hash] = true - } -} From e690851ef26026a8f15d4ec214f8fe508b5fe905 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Wed, 18 Jan 2023 11:42:38 -0500 Subject: [PATCH 6/7] Add a changelog entry for the enhancement. --- ...obabilistic-sampler-fnv-hash-enhancement.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .chloggen/probabilistic-sampler-fnv-hash-enhancement.yaml diff --git a/.chloggen/probabilistic-sampler-fnv-hash-enhancement.yaml b/.chloggen/probabilistic-sampler-fnv-hash-enhancement.yaml new file mode 100644 index 0000000000000..b2e0e2d8295b9 --- /dev/null +++ b/.chloggen/probabilistic-sampler-fnv-hash-enhancement.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/probabilisticsampler + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement the FNV hash library for the probabilistic sampler. + +# One or more tracking issues related to the change +issues: [16456] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 39fefc27a0fac4837d7268436930e4e966225d8b Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Fri, 20 Jan 2023 09:12:22 -0500 Subject: [PATCH 7/7] Remove unneeded comment --- processor/probabilisticsamplerprocessor/logsprocessor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index fa1dabcbe3fc5..00908fa85e502 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -98,7 +98,7 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: traceIDAttributeSource, }, - received: 45, // 52 + received: 45, }, { name: "sampling_source no sampling",