Skip to content

Commit

Permalink
[chore] Fix some wording in CONTRIBUTING.md (open-telemetry#27771)
Browse files Browse the repository at this point in the history
Fixes some incorrect wording in the contributing documentation
  • Loading branch information
crobert-1 authored and djaglowski committed Oct 16, 2023
1 parent 4a1d7c8 commit 9b1fee6
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 141 deletions.
10 changes: 5 additions & 5 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ providing the following information:
components, having a sponsor means that your use case has been validated.
* Some information about your component, such as the reasoning behind it, use-cases, telemetry data types supported, and
anything else you think is relevant for us to make a decision about accepting the component.
* The configuration options your component will accept. This will help us understand what it does and have an idea of
how the implementation might look like.
* The configuration options your component will accept. This will give us a better understanding of what it does, and
how it may be implemented.

Components refer to connectors, exporters, extensions, processors, and receivers. The key criteria to implementing a component is to:

Expand All @@ -121,7 +121,7 @@ Components refer to connectors, exporters, extensions, processors, and receivers
* Provide the implementation which performs the component operation
* Have a `metadata.yaml` file and its generated code (using [mdatadgen](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/cmd/mdatagen/README.md)).

Familiarize yourself with the interface of the component that you want to write, and use existing implementations as reference.
Familiarize yourself with the interface of the component that you want to write, and use existing implementations as a reference.
[Building a Trace Receiver](https://opentelemetry.io/docs/collector/trace-receiver/) tutorial provides a detailed example of building a component.

*NOTICE:* The Collector is in Beta stage and as such the interfaces may undergo breaking changes. Component creators
Expand All @@ -130,8 +130,8 @@ excluded from the default builds.

Generally, maintenance of components is the responsibility of contributors who authored them. If the original author or
some other contributor does not maintain the component it may be excluded from the default build. The component **will**
be excluded if it causes build problems, has failing tests or otherwise causes problems to the rest of the repository
and the rest of contributors.
be excluded if it causes build problems, has failing tests, or otherwise causes problems to the rest of the repository
and its contributors.

- Create your component under the proper folder and use Go standard package naming recommendations.
- Use a boiler-plate Makefile that just references the one at top level, ie.: `include ../../Makefile.Common` - this
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func BenchmarkFileInput(b *testing.B) {
}

b.ResetTimer()
err = op.Start(testutil.NewMockPersister("test"))
err = op.Start(testutil.NewMockPersister())
defer func() {
require.NoError(b, op.Stop())
}()
Expand Down
102 changes: 19 additions & 83 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"

import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
Expand All @@ -28,9 +27,9 @@ type Manager struct {
readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller
persister operator.Persister

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

Expand All @@ -43,12 +42,20 @@ type Manager struct {
func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

m.persister = persister

// Load offsets from disk
if err := m.loadLastPollFiles(ctx); err != nil {
offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
for _, offset := range offsets {
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: offset})
}
}

if _, err := m.fileMatcher.MatchFiles(); err != nil {
m.Warnf("finding files: %v", err)
Expand Down Expand Up @@ -150,7 +157,13 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

m.roller.roll(ctx, readers)
m.saveCurrent(readers)
m.syncLastPollFiles(ctx)

rmds := make([]*reader.Metadata, 0, len(readers))
for _, r := range readers {
rmds = append(rmds, r.Metadata)
}
checkpoint.Save(ctx, m.persister, rmds)

m.clearCurrentFingerprints()
}

Expand Down Expand Up @@ -263,80 +276,3 @@ func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Rea
}
return nil, false
}

const knownFilesKey = "knownFiles"

// syncLastPollFiles syncs the most recent set of files to the database
func (m *Manager) syncLastPollFiles(ctx context.Context) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

// Encode the number of known files
if err := enc.Encode(len(m.knownFiles)); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
return
}

// Encode each known file
for _, fileReader := range m.knownFiles {
if err := enc.Encode(fileReader.Metadata); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
}
}

if err := m.persister.Set(ctx, knownFilesKey, buf.Bytes()); err != nil {
m.Errorw("Failed to sync to database", zap.Error(err))
}
}

// syncLastPollFiles loads the most recent set of files to the database
func (m *Manager) loadLastPollFiles(ctx context.Context) error {
encoded, err := m.persister.Get(ctx, knownFilesKey)
if err != nil {
return err
}

if encoded == nil {
return nil
}

dec := json.NewDecoder(bytes.NewReader(encoded))

// Decode the number of entries
var knownFileCount int
if err = dec.Decode(&knownFileCount); err != nil {
return fmt.Errorf("decoding file count: %w", err)
}

if knownFileCount > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
}

// Decode each of the known files
for i := 0; i < knownFileCount; i++ {
rmd := new(reader.Metadata)
if err = dec.Decode(rmd); err != nil {
return err
}

// Migrate readers that used FileAttributes.HeaderAttributes
// This block can be removed in a future release, tentatively v0.90.0
if ha, ok := rmd.FileAttributes["HeaderAttributes"]; ok {
switch hat := ha.(type) {
case map[string]any:
for k, v := range hat {
rmd.FileAttributes[k] = v
}
delete(rmd.FileAttributes, "HeaderAttributes")
default:
m.Errorw("migrate header attributes: unexpected format")
}
}

// This reader won't be used for anything other than metadata reference, so just wrap the metadata
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd})
}

return nil
}
Loading

0 comments on commit 9b1fee6

Please sign in to comment.