Skip to content

Commit

Permalink
avro schema generator (#174)
Browse files Browse the repository at this point in the history
* schema generator
* snapshot emits schema
* add logrepl schema
* add uuid type formatting
  • Loading branch information
lyuboxa committed Jun 25, 2024
1 parent 125c029 commit 5ecd48e
Show file tree
Hide file tree
Showing 19 changed files with 667 additions and 68 deletions.
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/conduitio/conduit-connector-postgres

go 1.22
go 1.22.0

require (
github.com/Masterminds/sprig/v3 v3.2.3
Expand All @@ -11,6 +11,7 @@ require (
github.com/golangci/golangci-lint v1.59.1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.22.1
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.6.0
Expand Down Expand Up @@ -110,6 +111,7 @@ require (
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/jjti/go-spancheck v0.6.1 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/julz/importas v0.1.0 // indirect
github.com/karamaru-alpha/copyloopvar v1.1.0 // indirect
github.com/kisielk/errcheck v1.7.0 // indirect
Expand Down Expand Up @@ -139,6 +141,8 @@ require (
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/moricho/tparallel v0.3.1 // indirect
github.com/nakabonne/nestif v0.3.1 // indirect
github.com/nishanths/exhaustive v0.12.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ github.com/gostaticanalysis/nilerr v0.1.1/go.mod h1:wZYb6YI5YAxxq0i1+VJbY0s2YONW
github.com/gostaticanalysis/testutil v0.3.1-0.20210208050101-bfb5c8eec0e4/go.mod h1:D+FIZ+7OahH3ePw/izIEeH5I06eKs1IKI4Xr64/Am3M=
github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoISdUv3PPQgHY=
github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU=
github.com/hamba/avro/v2 v2.22.1 h1:q1rAbfJsrbMaZPDLQvwUQMfQzp6H+hGXvckmU/lXemk=
github.com/hamba/avro/v2 v2.22.1/go.mod h1:HOeTrE3kvWnBAgsufqhAzDDV5gvS0QXs65Z6BHfGgbg=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.6.0 h1:wgd4KxHJTVGGqWBq4QPB1i5BZNEx9BR8+OFmHDmTk8A=
Expand Down Expand Up @@ -356,6 +358,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
Expand Down Expand Up @@ -440,9 +443,11 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/moricho/tparallel v0.3.1 h1:fQKD4U1wRMAYNngDonW5XupoB/ZGJHdpzrWqgyg9krA=
github.com/moricho/tparallel v0.3.1/go.mod h1:leENX2cUv7Sv2qDgdi0D0fCftN8fRC67Bcn8pqzeYNI=
Expand Down
1 change: 1 addition & 0 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
TableKeys: s.tableKeys,
WithSnapshot: s.config.SnapshotMode == source.SnapshotModeInitial,
SnapshotFetchSize: s.config.SnapshotFetchSize,
WithAvroSchema: s.config.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand Down
5 changes: 5 additions & 0 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
type Config struct {
// URL is the connection string for the Postgres database.
URL string `json:"url" validate:"required"`

// Tables is a List of table names to read from, separated by a comma, e.g.:"table1,table2".
// Use "*" if you'd like to listen to all tables.
Tables []string `json:"tables"`
Expand All @@ -74,6 +75,10 @@ type Config struct {
// LogreplAutoCleanup determines if the replication slot and publication should be
// removed when the connector is deleted.
LogreplAutoCleanup bool `json:"logrepl.autoCleanup" default:"true"`

// WithAvroSchema determines whether the connector should attach an avro schema on each
// record.
WithAvroSchema bool `json:"logrepl.withAvroSchema" default:"false"`
}

// Validate validates the provided config values.
Expand Down
4 changes: 3 additions & 1 deletion source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type CDCConfig struct {
PublicationName string
Tables []string
TableKeys map[string]string
WithAvroSchema bool
}

// CDCIterator asynchronously listens for events from the logical replication
Expand Down Expand Up @@ -63,6 +64,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
}

records := make(chan sdk.Record)
handler := NewCDCHandler(internal.NewRelationSet(), c.TableKeys, c.WithAvroSchema, records)

sub, err := internal.CreateSubscription(
ctx,
Expand All @@ -71,7 +73,7 @@ func NewCDCIterator(ctx context.Context, pool *pgxpool.Pool, c CDCConfig) (*CDCI
c.PublicationName,
c.Tables,
c.LSN,
NewCDCHandler(internal.NewRelationSet(), c.TableKeys, records).Handle,
handler.Handle,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize subscription: %w", err)
Expand Down
13 changes: 8 additions & 5 deletions source/logrepl/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
TableKeys map[string]string
WithSnapshot bool
SnapshotFetchSize int
WithAvroSchema bool
}

// Validate performs validation tasks on the config.
Expand Down Expand Up @@ -177,6 +178,7 @@ func (c *CombinedIterator) initCDCIterator(ctx context.Context, pos position.Pos
PublicationName: c.conf.PublicationName,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create CDC iterator: %w", err)
Expand All @@ -198,11 +200,12 @@ func (c *CombinedIterator) initSnapshotIterator(ctx context.Context, pos positio
}

snapshotIterator, err := snapshot.NewIterator(ctx, c.pool, snapshot.Config{
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
Position: c.conf.Position,
Tables: c.conf.Tables,
TableKeys: c.conf.TableKeys,
TXSnapshotID: c.cdcIterator.TXSnapshotID(),
FetchSize: c.conf.SnapshotFetchSize,
WithAvroSchema: c.conf.WithAvroSchema,
})
if err != nil {
return fmt.Errorf("failed to create snapshot iterator: %w", err)
Expand Down
52 changes: 46 additions & 6 deletions source/logrepl/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

"github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
"github.com/conduitio/conduit-connector-postgres/source/position"
"github.com/conduitio/conduit-connector-postgres/source/schema"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/hamba/avro/v2"
"github.com/jackc/pglogrepl"
)

Expand All @@ -31,17 +33,23 @@ type CDCHandler struct {
relationSet *internal.RelationSet
out chan<- sdk.Record
lastTXLSN pglogrepl.LSN

relAvroSchema map[string]avro.Schema
withAvroSchema bool
}

func NewCDCHandler(
rs *internal.RelationSet,
tableKeys map[string]string,
withAvroSchema bool,
out chan<- sdk.Record,
) *CDCHandler {
return &CDCHandler{
tableKeys: tableKeys,
relationSet: rs,
out: out,
tableKeys: tableKeys,
relationSet: rs,
out: out,
withAvroSchema: withAvroSchema,
relAvroSchema: make(map[string]avro.Schema),
}
}

Expand Down Expand Up @@ -100,6 +108,10 @@ func (h *CDCHandler) handleInsert(
return fmt.Errorf("failed to decode new values: %w", err)
}

if err := h.updateAvroSchema(rel, msg.Tuple); err != nil {
return fmt.Errorf("failed to update avro schema: %w", err)
}

rec := sdk.Util.Source.NewRecordCreate(
h.buildPosition(lsn),
h.buildRecordMetadata(rel),
Expand Down Expand Up @@ -127,6 +139,10 @@ func (h *CDCHandler) handleUpdate(
return fmt.Errorf("failed to decode new values: %w", err)
}

if err := h.updateAvroSchema(rel, msg.NewTuple); err != nil {
return fmt.Errorf("failed to update avro schema: %w", err)
}

oldValues, err := h.relationSet.Values(msg.RelationID, msg.OldTuple)
if err != nil {
// this is not a critical error, old values are optional, just log it
Expand Down Expand Up @@ -180,10 +196,16 @@ func (h *CDCHandler) send(ctx context.Context, rec sdk.Record) error {
}
}

func (h *CDCHandler) buildRecordMetadata(relation *pglogrepl.RelationMessage) map[string]string {
return map[string]string{
sdk.MetadataCollection: relation.RelationName,
func (h *CDCHandler) buildRecordMetadata(rel *pglogrepl.RelationMessage) map[string]string {
m := map[string]string{
sdk.MetadataCollection: rel.RelationName,
}

if h.withAvroSchema {
m[schema.AvroMetadataKey] = h.relAvroSchema[rel.RelationName].String()
}

return m
}

// buildRecordKey takes the values from the message and extracts the key that
Expand All @@ -209,9 +231,27 @@ func (h *CDCHandler) buildRecordPayload(values map[string]any) sdk.Data {
return sdk.StructuredData(values)
}

// buildPosition stores the LSN in position and converts it to bytes.
func (*CDCHandler) buildPosition(lsn pglogrepl.LSN) sdk.Position {
return position.Position{
Type: position.TypeCDC,
LastLSN: lsn.String(),
}.ToSDKPosition()
}

// updateAvroSchema generates and stores avro schema based on the relation's row,
// when usage of avro schema is requested.
func (h *CDCHandler) updateAvroSchema(rel *pglogrepl.RelationMessage, row *pglogrepl.TupleData) error {
if !h.withAvroSchema {
return nil
}

sch, err := schema.Avro.ExtractLogrepl(rel, row)
if err != nil {
return err
}

h.relAvroSchema[rel.RelationName] = sch

return nil
}
2 changes: 1 addition & 1 deletion source/logrepl/internal/relationset.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (rs *RelationSet) Values(id uint32, row *pglogrepl.TupleData) (map[string]a
return nil, fmt.Errorf("failed to decode tuple %d: %w", i, err)
}

v, err := types.Format(val)
v, err := types.Format(col.DataType, val)
if err != nil {
return nil, fmt.Errorf("failed to format column %q type %T: %w", col.Name, val, err)
}
Expand Down
4 changes: 2 additions & 2 deletions source/logrepl/internal/relationset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func isValuesAllTypes(is *is.I, got map[string]any) {
"col_timestamptz": time.Date(2022, 3, 14, 15+8, 16, 17, 0, time.UTC).UTC(),
"col_tsquery": "'fat' & ( 'rat' | 'cat' )",
"col_tsvector": "'a' 'and' 'ate' 'cat' 'fat' 'mat' 'on' 'rat' 'sat'",
"col_uuid": [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66},
"col_uuid": "bd94ee0b-564f-4088-bf4e-8d5e626caf66", // [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66}
"col_xml": "<foo>bar</foo>",
}
is.Equal("", cmp.Diff(want, got,
Expand Down Expand Up @@ -440,7 +440,7 @@ func isValuesAllTypesStandalone(is *is.I, got map[string]any) {
"col_timestamptz": time.Date(2022, 3, 14, 15+8, 16, 17, 0, time.UTC).UTC().String(),
"col_tsquery": "'fat' & ( 'rat' | 'cat' )",
"col_tsvector": "'a' 'and' 'ate' 'cat' 'fat' 'mat' 'on' 'rat' 'sat'",
"col_uuid": [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66},
"col_uuid": "bd94ee0b-564f-4088-bf4e-8d5e626caf66", // [16]uint8{0xbd, 0x94, 0xee, 0x0b, 0x56, 0x4f, 0x40, 0x88, 0xbf, 0x4e, 0x8d, 0x5e, 0x62, 0x6c, 0xaf, 0x66}
"col_xml": "<foo>bar</foo>",
}
is.Equal("", cmp.Diff(want, got,
Expand Down
6 changes: 6 additions & 0 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5ecd48e

Please sign in to comment.