Skip to content

Commit

Permalink
[receiver/sqlquery] add support for logs (#20730)
Browse files Browse the repository at this point in the history
Fixes #20284 

This introduces initial support for retrieving rows from SQL databases
into logs.

This PR aims to provide an initial, not feature rich, but production
ready implementation. The following features are available:

- Use `body_column` to select the column to use to fill the Body field
of the created log
- Use `tracking_start_value` and `tracking_column` properties to track
rows that were already ingested
- Use `storage` property to persist the tracking value across collector
restarts

In this state and marked as "development" stability, the component can
be used for experimentation and to guide future development.

There are definitely more things that need to be implemented for this
component to be considered "alpha" quality - like filling in other [log
fields](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/logs/data-model.md#log-and-event-record-definition)
like Timestamp, ObservedTimestamp and others. I would like to add them
in subsequent pull requests, as this pull request is already way too
big.

---------

Co-authored-by: Dominika Molenda <[email protected]>
Co-authored-by: Dominika Molenda <[email protected]>
Co-authored-by: Katarzyna Kujawa <[email protected]>
Co-authored-by: Katarzyna Kujawa <[email protected]>
  • Loading branch information
5 people committed Jun 14, 2023
1 parent e7608db commit 1bf2547
Show file tree
Hide file tree
Showing 24 changed files with 954 additions and 46 deletions.
16 changes: 16 additions & 0 deletions .chloggen/sqlquery-receiver-add-logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for logs

# One or more tracking issues related to the change
issues: [20284]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
100 changes: 83 additions & 17 deletions receiver/sqlqueryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
| Status | |
| ------------- |-----------|
| Stability | [alpha]: metrics |
| | [development]: logs |
| Distributions | [contrib], [observiq], [splunk], [sumo] |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[observiq]: https://github.com/observIQ/observiq-otel-collector
[splunk]: https://github.com/signalfx/splunk-otel-collector
Expand All @@ -28,29 +30,88 @@ The configuration supports the following top-level fields:
a driver-specific string usually consisting of at least a database name and connection information. This is sometimes
referred to as the "connection string" in driver documentation.
e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_
- `queries`(required): A list of queries, where a query is a sql statement and one or more metrics (details below).
- `queries`(required): A list of queries, where a query is a sql statement and one or more `logs` and/or `metrics` sections (details below).
- `collection_interval`(optional): The time interval between query executions. Defaults to _10s_.
- `storage` (optional, default `""`): The ID of a [storage][storage_extension] extension to be used to [track processed results](#tracking-processed-results).

[storage_extension]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage

### Queries

A _query_ consists of a sql statement and one or more _metrics_, where each metric consists of a
A _query_ consists of a sql statement and one or more `logs` and/or `metrics` section.
At least one `logs` or one `metrics` section is required.
Note that technically you can put both `logs` and `metrics` sections in a single query section,
but it's probably not a real world use case, as the requirements for logs and metrics queries
are quite different.

Additionally, each `query` section supports the following properties:

- `tracking_column` (optional, default `""`) Applies only to logs. In case of a parameterized query,
defines the column to retrieve the value of the parameter on subsequent query runs.
See the below section [Tracking processed results](#tracking-processed-results).
- `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter.
See the below section [Tracking processed results](#tracking-processed-results).

Example:

```yaml
receivers:
sqlquery:
driver: postgres
datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable"
queries:
- sql: "select * from my_logs where log_id > $$1"
tracking_start_value: "10000"
tracking_column: log_id
logs:
- body_column: log_body
- sql: "select count(*) as count, genre from movie group by genre"
metrics:
- metric_name: movie.genres
value_column: "count"
```

#### Logs Queries

The `logs` section is in development.

- `body_column` (required) defines the column to use as the log record's body.

##### Tracking processed results

With the default configuration and a non-parameterized logs query like `select * from my_logs`,
the receiver will run the same query every collection interval, which can cause reading the same rows
over and over again, unless there's an external actor removing the old rows from the `my_logs` table.

To prevent reading the same rows on every collection interval, use a parameterized query like `select * from my_logs where id_column > ?`,
together with the `tracking_start_value` and `tracking_column` configuration properties.
The receiver will use the configured `tracking_start_value` as the value for the query parameter when running the query for the first time.
After each query run, the receiver will store the value of the `tracking_column` from the last row of the result set and use it as the value for the query parameter on next collection interval. To prevent duplicate log downloads, make sure to sort the query results in ascending order by the tracking_column value.

Note that the notation for the parameter depends on the database backend. For example in MySQL this is `?`, in PostgreSQL this is `$1`, in Oracle this is any string identifier starting with a colon `:`, for example `:my_parameter`.

Use the `storage` configuration property of the receiver to persist the tracking value across collector restarts.

#### Metrics queries

Each `metrics` section consists of a
`metric_name`, a `value_column`, and additional optional fields.
Each _metric_ in the configuration will produce one OTel metric per row returned from its sql query.

* `metric_name`(required): the name assigned to the OTel metric.
* `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint.
- `metric_name`(required): the name assigned to the OTel metric.
- `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint.
This may be case-sensitive, depending on the driver (e.g. Oracle DB).
* `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint.
- `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint.
These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB).
* `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`.
* `value_type` (optional): can be `int` or `double`; defaults to `int`.
* `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over
- `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`.
- `value_type` (optional): can be `int` or `double`; defaults to `int`.
- `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over
or resets); defaults to false.
* `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults
- `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults
to `cumulative`.
* `description` (optional): the description applied to the metric.
* `unit` (optional): the units applied to the metric.
* `static_attributes` (optional): static attributes applied to the metrics
- `description` (optional): the description applied to the metric.
- `unit` (optional): the units applied to the metric.
- `static_attributes` (optional): static attributes applied to the metrics

### Example

Expand All @@ -59,28 +120,34 @@ receivers:
sqlquery:
driver: postgres
datasource: "host=localhost port=5432 user=postgres password=s3cr3t sslmode=disable"
storage: file_storage
queries:
- sql: "select * from my_logs where log_id > $$1"
tracking_start_value: "10000"
tracking_column: log_id
logs:
- body_column: log_body
- sql: "select count(*) as count, genre from movie group by genre"
metrics:
- metric_name: movie.genres
value_column: "count"
attribute_columns: [ "genre" ]
attribute_columns: ["genre"]
static_attributes:
dbinstance: mydbinstance
```

Given a `movie` table with three rows:

| name | genre |
|-----------|--------|
| --------- | ------ |
| E.T. | sci-fi |
| Star Wars | sci-fi |
| Die Hard | action |

If there are two rows returned from the query `select count(*) as count, genre from movie group by genre`:

| count | genre |
|-------|--------|
| ----- | ------ |
| 2 | sci-fi |
| 1 | action |

Expand All @@ -94,7 +161,7 @@ Descriptor:
NumberDataPoints #0
Data point attributes:
-> genre: STRING(sci-fi)
-> dbinstance: STRING(mydbinstance)
-> dbinstance: STRING(mydbinstance)
Value: 2
Metric #1
Expand All @@ -121,4 +188,3 @@ Oracle DB driver to connect and query the same table schema and contents as the
The Oracle DB driver documentation can be found [here.](https://github.com/sijms/go-ora)
Another usage example is the `go_ora`
example [here.](https://blogs.oracle.com/developers/post/connecting-a-go-application-to-oracle-database)

35 changes: 28 additions & 7 deletions receiver/sqlqueryreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (

type Config struct {
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
Driver string `mapstructure:"driver"`
DataSource string `mapstructure:"datasource"`
Queries []Query `mapstructure:"queries"`
Driver string `mapstructure:"driver"`
DataSource string `mapstructure:"datasource"`
Queries []Query `mapstructure:"queries"`
StorageID *component.ID `mapstructure:"storage"`
}

func (c Config) Validate() error {
Expand All @@ -41,17 +42,25 @@ func (c Config) Validate() error {
}

type Query struct {
SQL string `mapstructure:"sql"`
Metrics []MetricCfg `mapstructure:"metrics"`
SQL string `mapstructure:"sql"`
Metrics []MetricCfg `mapstructure:"metrics"`
Logs []LogsCfg `mapstructure:"logs"`
TrackingColumn string `mapstructure:"tracking_column"`
TrackingStartValue string `mapstructure:"tracking_start_value"`
}

func (q Query) Validate() error {
var errs error
if q.SQL == "" {
errs = multierr.Append(errs, errors.New("'query.sql' cannot be empty"))
}
if len(q.Metrics) == 0 {
errs = multierr.Append(errs, errors.New("'query.metrics' cannot be empty"))
if len(q.Logs) == 0 && len(q.Metrics) == 0 {
errs = multierr.Append(errs, errors.New("at least one of 'query.logs' and 'query.metrics' must not be empty"))
}
for _, logs := range q.Logs {
if err := logs.Validate(); err != nil {
errs = multierr.Append(errs, err)
}
}
for _, metric := range q.Metrics {
if err := metric.Validate(); err != nil {
Expand All @@ -61,6 +70,18 @@ func (q Query) Validate() error {
return errs
}

type LogsCfg struct {
BodyColumn string `mapstructure:"body_column"`
}

func (config LogsCfg) Validate() error {
var errs error
if config.BodyColumn == "" {
errs = multierr.Append(errs, errors.New("'body_column' must not be empty"))
}
return errs
}

type MetricCfg struct {
MetricName string `mapstructure:"metric_name"`
ValueColumn string `mapstructure:"value_column"`
Expand Down
35 changes: 32 additions & 3 deletions receiver/sqlqueryreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,44 @@ func TestLoadConfig(t *testing.T) {
errorMessage: "'driver' cannot be empty",
},
{
fname: "config-invalid-missing-metrics.yaml",
fname: "config-invalid-missing-logs-metrics.yaml",
id: component.NewIDWithName(metadata.Type, ""),
errorMessage: "'query.metrics' cannot be empty",
errorMessage: "at least one of 'query.logs' and 'query.metrics' must not be empty",
},
{
fname: "config-invalid-missing-datasource.yaml",
id: component.NewIDWithName(metadata.Type, ""),
errorMessage: "'datasource' cannot be empty",
},
{
fname: "config-logs.yaml",
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
ScraperControllerSettings: scraperhelper.ScraperControllerSettings{
CollectionInterval: 10 * time.Second,
InitialDelay: time.Second,
},
Driver: "mydriver",
DataSource: "host=localhost port=5432 user=me password=s3cr3t sslmode=disable",
Queries: []Query{
{
SQL: "select * from test_logs where log_id > ?",
TrackingColumn: "log_id",
TrackingStartValue: "10",
Logs: []LogsCfg{
{
BodyColumn: "log_body",
},
},
},
},
},
},
{
fname: "config-logs-missing-body-column.yaml",
id: component.NewIDWithName(metadata.Type, ""),
errorMessage: "'body_column' must not be empty",
},
{
fname: "config-unnecessary-aggregation.yaml",
id: component.NewIDWithName(metadata.Type, ""),
Expand All @@ -113,7 +142,7 @@ func TestLoadConfig(t *testing.T) {
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
t.Run(tt.fname, func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", tt.fname))
require.NoError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions receiver/sqlqueryreceiver/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
type stringMap map[string]string

type dbClient interface {
metricRows(ctx context.Context) ([]stringMap, error)
queryRows(ctx context.Context, args ...any) ([]stringMap, error)
}

type dbSQLClient struct {
Expand All @@ -37,8 +37,8 @@ func newDbClient(db db, sql string, logger *zap.Logger) dbClient {
}
}

func (cl dbSQLClient) metricRows(ctx context.Context) ([]stringMap, error) {
sqlRows, err := cl.db.QueryContext(ctx, cl.sql)
func (cl dbSQLClient) queryRows(ctx context.Context, args ...any) ([]stringMap, error) {
sqlRows, err := cl.db.QueryContext(ctx, cl.sql, args...)
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions receiver/sqlqueryreceiver/db_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestDBSQLClient_SingleRow(t *testing.T) {
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
rows, err := cl.queryRows(context.Background())
require.NoError(t, err)
assert.Len(t, rows, 1)
assert.EqualValues(t, map[string]string{
Expand All @@ -42,7 +42,7 @@ func TestDBSQLClient_MultiRow(t *testing.T) {
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
rows, err := cl.queryRows(context.Background())
require.NoError(t, err)
assert.Len(t, rows, 2)
assert.EqualValues(t, map[string]string{
Expand All @@ -69,7 +69,7 @@ func TestDBSQLClient_Nulls(t *testing.T) {
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
rows, err := cl.queryRows(context.Background())
assert.Error(t, err)
assert.True(t, errors.Is(err, errNullValueWarning))
assert.Len(t, rows, 1)
Expand All @@ -88,7 +88,7 @@ func TestDBSQLClient_Nulls_MultiRow(t *testing.T) {
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
rows, err := cl.queryRows(context.Background())
assert.Error(t, err)
errs := multierr.Errors(err)
for _, err := range errs {
Expand Down Expand Up @@ -152,7 +152,7 @@ type fakeDBClient struct {
err error
}

func (c *fakeDBClient) metricRows(context.Context) ([]stringMap, error) {
func (c *fakeDBClient) queryRows(context.Context, ...any) ([]stringMap, error) {
if c.err != nil {
return nil, c.err
}
Expand Down
3 changes: 2 additions & 1 deletion receiver/sqlqueryreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithMetrics(createReceiverFunc(sql.Open, newDbClient), metadata.MetricsStability),
receiver.WithLogs(createLogsReceiverFunc(sql.Open, newDbClient), metadata.LogsStability),
receiver.WithMetrics(createMetricsReceiverFunc(sql.Open, newDbClient), metadata.MetricsStability),
)
}
7 changes: 7 additions & 0 deletions receiver/sqlqueryreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@ func TestNewFactory(t *testing.T) {
consumertest.NewNop(),
)
require.NoError(t, err)
_, err = factory.CreateLogsReceiver(
context.Background(),
receivertest.NewNopCreateSettings(),
factory.CreateDefaultConfig(),
consumertest.NewNop(),
)
require.NoError(t, err)
}
Loading

0 comments on commit 1bf2547

Please sign in to comment.