Skip to content

Commit

Permalink
Merge branch 'master' into add_amqp_tls_external_auth_support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail authored Oct 18, 2021
2 parents 399127a + 6df2ce2 commit ecf17cd
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 39 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ All notable changes to this project will be documented in this file.

## Unreleased

- amqp_0_9 components now support TLS EXTERNAL auth.
### Added

- `amqp_0_9` components now support TLS EXTERNAL auth.

### Fixed

- Removed a performance bottleneck when consuming a large quantity of small files with the `file` input.

## 3.57.0 - 2021-10-14

Expand Down
80 changes: 42 additions & 38 deletions lib/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,29 @@ func newFileConsumer(conf FileConfig, log log.Modular) (*fileConsumer, error) {
}, nil
}

// ConnectWithContext attempts to establish a connection to the target S3 bucket
// and any relevant queues used to traverse the objects (SQS, etc).
// ConnectWithContext does nothing as we don't have a concept of a connection
// with this input.
func (f *fileConsumer) ConnectWithContext(ctx context.Context) error {
return nil
}

func (f *fileConsumer) getReader(ctx context.Context) (codec.Reader, string, error) {
f.scannerMut.Lock()
defer f.scannerMut.Unlock()

if f.scanner != nil {
return nil
return f.scanner, f.currentPath, nil
}

if len(f.paths) == 0 {
return types.ErrTypeClosed
return nil, "", types.ErrTypeClosed
}

nextPath := f.paths[0]

file, err := os.Open(nextPath)
if err != nil {
return err
return nil, "", err
}

if f.scanner, err = f.scannerCtor(nextPath, file, func(ctx context.Context, err error) error {
Expand All @@ -175,56 +179,56 @@ func (f *fileConsumer) ConnectWithContext(ctx context.Context) error {
return nil
}); err != nil {
file.Close()
return err
return nil, "", err
}

f.currentPath = nextPath
f.paths = f.paths[1:]

f.log.Infof("Consuming from file '%v'\n", nextPath)
return nil
return f.scanner, f.currentPath, nil
}

// ReadWithContext attempts to read a new message from the target S3 bucket.
func (f *fileConsumer) ReadWithContext(ctx context.Context) (types.Message, reader.AsyncAckFn, error) {
f.scannerMut.Lock()
defer f.scannerMut.Unlock()

if f.scanner == nil {
return nil, nil, types.ErrNotConnected
}
for {
scanner, currentPath, err := f.getReader(ctx)
if err != nil {
return nil, nil, err
}

parts, codecAckFn, err := f.scanner.Next(ctx)
if err != nil {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
err = types.ErrTimeout
parts, codecAckFn, err := scanner.Next(ctx)
if err != nil {
if errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) {
err = types.ErrTimeout
}
if err != types.ErrTimeout {
f.scanner.Close(ctx)
f.scanner = nil
}
if errors.Is(err, io.EOF) {
continue
}
return nil, nil, err
}
if err != types.ErrTimeout {
f.scanner.Close(ctx)
f.scanner = nil

msg := message.New(nil)
for _, part := range parts {
if len(part.Get()) > 0 {
part.Metadata().Set("path", currentPath)
msg.Append(part)
}
}
if errors.Is(err, io.EOF) {
if msg.Len() == 0 {
_ = codecAckFn(ctx, nil)
return nil, nil, types.ErrTimeout
}
return nil, nil, err
}

msg := message.New(nil)
for _, part := range parts {
if len(part.Get()) > 0 {
part.Metadata().Set("path", f.currentPath)
msg.Append(part)
}
}
if msg.Len() == 0 {
_ = codecAckFn(ctx, nil)
return nil, nil, types.ErrTimeout
return msg, func(rctx context.Context, res types.Response) error {
return codecAckFn(rctx, res.Error())
}, nil
}

return msg, func(rctx context.Context, res types.Response) error {
return codecAckFn(rctx, res.Error())
}, nil
}

// CloseAsync begins cleaning up resources used by this reader asynchronously.
Expand Down
79 changes: 79 additions & 0 deletions lib/input/file_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package input

import (
"context"
"fmt"
"io/ioutil"
"os"
"reflect"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -144,3 +147,79 @@ func TestFileMultiPartDeprecated(t *testing.T) {
t.Error("Timed out waiting for channel close")
}
}

func TestFileDirectory(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "benthos_file_input_test")
require.NoError(t, err)

tmpInnerDir, err := ioutil.TempDir(tmpDir, "benthos_inner")
require.NoError(t, err)

t.Cleanup(func() {
os.RemoveAll(tmpDir)
})

tmpFile, err := ioutil.TempFile(tmpDir, "f1*.txt")
require.NoError(t, err)

_, err = tmpFile.Write([]byte("foo"))
require.NoError(t, err)

err = tmpFile.Close()
require.NoError(t, err)

tmpFileTwo, err := ioutil.TempFile(tmpInnerDir, "f2*.txt")
require.NoError(t, err)

_, err = tmpFileTwo.Write([]byte("bar"))
require.NoError(t, err)

err = tmpFileTwo.Close()
require.NoError(t, err)

exp := map[string]struct{}{
"foo": {},
"bar": {},
}
act := map[string]struct{}{}

conf := NewFileConfig()
conf.Paths = []string{
fmt.Sprintf("%v/*.txt", tmpDir),
fmt.Sprintf("%v/**/*.txt", tmpDir),
}
conf.Codec = "all-bytes"

f, err := newFileConsumer(conf, log.Noop())
require.NoError(t, err)

err = f.ConnectWithContext(context.Background())
require.NoError(t, err)

msg, aFn, err := f.ReadWithContext(context.Background())
require.NoError(t, err)

resStr := string(msg.Get(0).Get())
if _, exists := act[resStr]; exists {
t.Errorf("Received duplicate message: %v", resStr)
}
act[resStr] = struct{}{}
require.NoError(t, aFn(context.Background(), response.NewAck()))

msg, aFn, err = f.ReadWithContext(context.Background())
require.NoError(t, err)

resStr = string(msg.Get(0).Get())
if _, exists := act[resStr]; exists {
t.Errorf("Received duplicate message: %v", resStr)
}
act[resStr] = struct{}{}
require.NoError(t, aFn(context.Background(), response.NewAck()))

_, _, err = f.ReadWithContext(context.Background())
assert.Equal(t, types.ErrTypeClosed, err)

if !reflect.DeepEqual(exp, act) {
t.Errorf("Wrong result: %v != %v", act, exp)
}
}

0 comments on commit ecf17cd

Please sign in to comment.