Skip to content

Commit

Permalink
Add refresh_period field to schema_registry_encode
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Oct 22, 2021
1 parent e223008 commit a8a4d0e
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 41 deletions.
22 changes: 13 additions & 9 deletions internal/impl/confluent/schema_registry_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,7 @@ func init() {
err := service.RegisterProcessor(
"schema_registry_decode", schemaRegistryDecoderConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
urlStr, err := conf.FieldString("url")
if err != nil {
return nil, err
}
tlsConf, err := conf.FieldTLS("tls")
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, tlsConf, mgr.Logger())
return newSchemaRegistryDecoderFromConfig(conf, mgr.Logger())
})

if err != nil {
Expand All @@ -66,6 +58,18 @@ type schemaRegistryDecoder struct {
logger *service.Logger
}

func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, logger *service.Logger) (*schemaRegistryDecoder, error) {
urlStr, err := conf.FieldString("url")
if err != nil {
return nil, err
}
tlsConf, err := conf.FieldTLS("tls")
if err != nil {
return nil, err
}
return newSchemaRegistryDecoder(urlStr, tlsConf, logger)
}

func newSchemaRegistryDecoder(urlStr string, tlsConf *tls.Config, logger *service.Logger) (*schemaRegistryDecoder, error) {
u, err := url.Parse(urlStr)
if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions internal/impl/confluent/schema_registry_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,42 @@ import (
"github.com/stretchr/testify/require"
)

func TestSchemaRegistryDecoderConfigParse(t *testing.T) {
configTests := []struct {
name string
config string
errContains string
}{
{
name: "bad url",
config: `
url: huh#%#@$u*not////::example.com
`,
errContains: `failed to parse url`,
},
}

spec := schemaRegistryDecoderConfig()
env := service.NewEnvironment()
for _, test := range configTests {
t.Run(test.name, func(t *testing.T) {
conf, err := spec.ParseYAML(test.config, env)
require.NoError(t, err)

e, err := newSchemaRegistryDecoderFromConfig(conf, nil)
if err == nil {
_ = e.Close(context.Background())
}
if test.errContains == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errContains)
}
})
}
}

func runSchemaRegistryServer(t *testing.T, fn func(path string) ([]byte, error)) string {
t.Helper()

Expand Down
82 changes: 53 additions & 29 deletions internal/impl/confluent/schema_registry_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ Currently only Avro schemas are supported.`).
Field(service.NewInterpolatedStringField("subject").Description("The schema subject to derive schemas from.").
Example("foo").
Example(`${! meta("kafka_topic") }`)).
Field(service.NewStringField("refresh_period").
Description("The period after which a schema is refreshed for each subject, this is done by polling the schema registry service.").
Default("10m").
Example("60s").
Example("1h")).
Field(service.NewTLSField("tls")).
Version("3.58.0")
}
Expand All @@ -42,19 +47,7 @@ func init() {
err := service.RegisterProcessor(
"schema_registry_encode", schemaRegistryEncoderConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
urlStr, err := conf.FieldString("url")
if err != nil {
return nil, err
}
subject, err := conf.FieldInterpolatedString("subject")
if err != nil {
return nil, err
}
tlsConf, err := conf.FieldTLS("tls")
if err != nil {
return nil, err
}
return newSchemaRegistryEncoder(urlStr, tlsConf, subject, mgr.Logger())
return newSchemaRegistryEncoderFromConfig(conf, mgr.Logger())
})

if err != nil {
Expand All @@ -65,8 +58,9 @@ func init() {
//------------------------------------------------------------------------------

type schemaRegistryEncoder struct {
client *http.Client
subject *service.InterpolatedString
client *http.Client
subject *service.InterpolatedString
schemaRefreshAfter time.Duration

schemaServerURL *url.URL

Expand All @@ -79,19 +73,54 @@ type schemaRegistryEncoder struct {
nowFn func() time.Time
}

func newSchemaRegistryEncoder(urlStr string, tlsConf *tls.Config, subject *service.InterpolatedString, logger *service.Logger) (*schemaRegistryEncoder, error) {
func newSchemaRegistryEncoderFromConfig(conf *service.ParsedConfig, logger *service.Logger) (*schemaRegistryEncoder, error) {
urlStr, err := conf.FieldString("url")
if err != nil {
return nil, err
}
subject, err := conf.FieldInterpolatedString("subject")
if err != nil {
return nil, err
}
refreshPeriodStr, err := conf.FieldString("refresh_period")
if err != nil {
return nil, err
}
refreshPeriod, err := time.ParseDuration(refreshPeriodStr)
if err != nil {
return nil, fmt.Errorf("failed to parse refresh period: %v", err)
}
refreshTicker := refreshPeriod / 10
if refreshTicker < time.Second {
refreshTicker = time.Second
}
tlsConf, err := conf.FieldTLS("tls")
if err != nil {
return nil, err
}
return newSchemaRegistryEncoder(urlStr, tlsConf, subject, refreshPeriod, refreshTicker, logger)
}

func newSchemaRegistryEncoder(
urlStr string,
tlsConf *tls.Config,
subject *service.InterpolatedString,
schemaRefreshAfter, schemaRefreshTicker time.Duration,
logger *service.Logger,
) (*schemaRegistryEncoder, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("failed to parse url: %w", err)
}

s := &schemaRegistryEncoder{
schemaServerURL: u,
subject: subject,
schemas: map[string]*cachedSchemaEncoder{},
shutSig: shutdown.NewSignaller(),
logger: logger,
nowFn: time.Now,
schemaServerURL: u,
subject: subject,
schemaRefreshAfter: schemaRefreshAfter,
schemas: map[string]*cachedSchemaEncoder{},
shutSig: shutdown.NewSignaller(),
logger: logger,
nowFn: time.Now,
}

s.client = http.DefaultClient
Expand All @@ -111,7 +140,7 @@ func newSchemaRegistryEncoder(urlStr string, tlsConf *tls.Config, subject *servi
go func() {
for {
select {
case <-time.After(schemaCacheRefreshPeriod):
case <-time.After(schemaRefreshTicker):
s.refreshEncoders()
case <-s.shutSig.CloseAtLeisureChan():
return
Expand Down Expand Up @@ -178,17 +207,12 @@ func insertID(id int, content []byte) ([]byte, error) {
return newBytes, nil
}

const (
schemaRefreshAfter = time.Minute * 10
schemaCacheRefreshPeriod = time.Minute
)

func (s *schemaRegistryEncoder) refreshEncoders() {
// First pass in read only mode to gather purge candidates and refresh
// candidates
s.cacheMut.RLock()
purgeTargetTime := s.nowFn().Add(-schemaStaleAfter).Unix()
updateTargetTime := s.nowFn().Add(-schemaRefreshAfter).Unix()
updateTargetTime := s.nowFn().Add(-s.schemaRefreshAfter).Unix()
var purgeTargets, refreshTargets []string
for k, v := range s.schemas {
if atomic.LoadInt64(&v.lastUsedUnixSeconds) < purgeTargetTime {
Expand Down
67 changes: 64 additions & 3 deletions internal/impl/confluent/schema_registry_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,67 @@ import (
"github.com/stretchr/testify/require"
)

func TestSchemaRegistryEncoderConfigParse(t *testing.T) {
configTests := []struct {
name string
config string
errContains string
}{
{
name: "bad url",
config: `
url: huh#%#@$u*not////::example.com
subject: foo
`,
errContains: `failed to parse url`,
},
{
name: "bad subject",
config: `
url: https://example.com
subject: ${! bad interpolation }
`,
errContains: `failed to parse interpolated field`,
},
{
name: "use default period",
config: `
url: https://example.com
subject: foo
`,
},
{
name: "bad period",
config: `
url: https://example.com
subject: foo
refresh_period: not a duration
`,
errContains: "invalid duration",
},
}

spec := schemaRegistryEncoderConfig()
env := service.NewEnvironment()
for _, test := range configTests {
t.Run(test.name, func(t *testing.T) {
conf, err := spec.ParseYAML(test.config, env)
require.NoError(t, err)

e, err := newSchemaRegistryEncoderFromConfig(conf, nil)
if err == nil {
_ = e.Close(context.Background())
}
if test.errContains == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Contains(t, err.Error(), test.errContains)
}
})
}
}

func TestSchemaRegistryEncode(t *testing.T) {
fooFirst, err := json.Marshal(struct {
Schema string `json:"schema"`
Expand All @@ -34,7 +95,7 @@ func TestSchemaRegistryEncode(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, nil, subj, nil)
encoder, err := newSchemaRegistryEncoder(urlStr, nil, subj, time.Minute*10, time.Minute, nil)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -93,7 +154,7 @@ func TestSchemaRegistryEncodeClearExpired(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, nil, subj, nil)
encoder, err := newSchemaRegistryEncoder(urlStr, nil, subj, time.Minute*10, time.Minute, nil)
require.NoError(t, err)
require.NoError(t, encoder.Close(context.Background()))

Expand Down Expand Up @@ -154,7 +215,7 @@ func TestSchemaRegistryEncodeRefresh(t *testing.T) {
subj, err := service.NewInterpolatedString("foo")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, nil, subj, nil)
encoder, err := newSchemaRegistryEncoder(urlStr, nil, subj, time.Minute*10, time.Minute, nil)
require.NoError(t, err)
require.NoError(t, encoder.Close(context.Background()))

Expand Down
18 changes: 18 additions & 0 deletions website/docs/components/processors/schema_registry_encode.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ label: ""
schema_registry_encode:
url: ""
subject: ""
refresh_period: 10m
```
</TabItem>
Expand All @@ -47,6 +48,7 @@ label: ""
schema_registry_encode:
url: ""
subject: ""
refresh_period: 10m
tls:
skip_cert_verify: false
enable_renegotiation: false
Expand Down Expand Up @@ -89,6 +91,22 @@ subject: foo
subject: ${! meta("kafka_topic") }
```

### `refresh_period`

The period after which a schema is refreshed for each subject, this is done by polling the schema registry service.


Type: `string`
Default: `"10m"`

```yaml
# Examples
refresh_period: 60s
refresh_period: 1h
```

### `tls`

Custom TLS settings can be used to override system defaults.
Expand Down

0 comments on commit a8a4d0e

Please sign in to comment.