Skip to content

Commit

Permalink
[receiver/sqlquery] fix handling of nulls (open-telemetry#19750)
Browse files Browse the repository at this point in the history
This fixes the receiver panicking on NULL values. Additional
refactoring was done to improve test coverage.
  • Loading branch information
pmcollins committed Mar 17, 2023
1 parent dc6e1d2 commit 2a4bab1
Show file tree
Hide file tree
Showing 13 changed files with 496 additions and 99 deletions.
16 changes: 16 additions & 0 deletions .chloggen/fix-19177.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Don't panic when a query produces NULLs

# One or more tracking issues related to the change
issues: [19177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
39 changes: 26 additions & 13 deletions receiver/sqlqueryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@

The SQL Query Receiver uses custom SQL queries to generate metrics from a database connection.

> :construction: This receiver is in **ALPHA**. Behavior, configuration fields, and metric data model are subject to change.
> :construction: This receiver is in **ALPHA**. Behavior, configuration fields, and metric data model are subject to
> change.
## Configuration

The configuration supports the following top-level fields:

- `driver`(required): The name of the database driver: one of _postgres_, _mysql_, _snowflake_, _sqlserver_, _hdb_ (SAP HANA), or _oracle_ (Oracle DB).
- `driver`(required): The name of the database driver: one of _postgres_, _mysql_, _snowflake_, _sqlserver_, _hdb_ (SAP
HANA), or _oracle_ (Oracle DB).
- `datasource`(required): The datasource value passed to [sql.Open](https://pkg.go.dev/database/sql#Open). This is
a driver-specific string usually consisting of at least a database name and connection information. This is sometimes
referred to as the "connection string" in driver documentation.
e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_
a driver-specific string usually consisting of at least a database name and connection information. This is sometimes
referred to as the "connection string" in driver documentation.
e.g. _host=localhost port=5432 user=me password=s3cr3t sslmode=disable_
- `queries`(required): A list of queries, where a query is a sql statement and one or more metrics (details below).
- `collection_interval`(optional): The time interval between query executions. Defaults to _10s_.

Expand All @@ -29,12 +31,16 @@ A _query_ consists of a sql statement and one or more _metrics_, where each metr
Each _metric_ in the configuration will produce one OTel metric per row returned from its sql query.

* `metric_name`(required): the name assigned to the OTel metric.
* `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint. This may be case-sensitive, depending on the driver (e.g. Oracle DB).
* `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint. These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB).
* `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint.
This may be case-sensitive, depending on the driver (e.g. Oracle DB).
* `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint.
These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB).
* `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`.
* `value_type` (optional): can be `int` or `double`; defaults to `int`.
* `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over or resets); defaults to false.
* `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults to `cumulative`.
* `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over
or resets); defaults to false.
* `aggregation` (optional): only applicable for `data_type=sum`; can be `cumulative` or `delta`; defaults
to `cumulative`.
* `description` (optional): the description applied to the metric.
* `unit` (optional): the units applied to the metric.
* `static_attributes` (optional): static attributes applied to the metrics
Expand All @@ -52,8 +58,8 @@ receivers:
- metric_name: movie.genres
value_column: "count"
attribute_columns: [ "genre" ]
static_attributes:
dbinstance: mydbinstance
static_attributes:
dbinstance: mydbinstance
```

Given a `movie` table with three rows:
Expand All @@ -64,7 +70,6 @@ Given a `movie` table with three rows:
| Star Wars | sci-fi |
| Die Hard | action |


If there are two rows returned from the query `select count(*) as count, genre from movie group by genre`:

| count | genre |
Expand Down Expand Up @@ -96,12 +101,20 @@ Data point attributes:
Value: 1
```

#### NULL values

Avoid queries that produce any NULL values. If a query produces a NULL value, a warning will be logged. Furthermore,
if a configuration references the column that produces a NULL value, an additional error will be logged. However, in
either case, the receiver will continue to operate.

#### Oracle DB Driver Example

Refer to the config file [provided](./testdata/oracledb-receiver-config.yaml) for an example of using the
Oracle DB driver to connect and query the same table schema and contents as the example above.
The Oracle DB driver documentation can be found [here.](https://github.com/sijms/go-ora)
Another usage example is the `go_ora` example [here.](https://blogs.oracle.com/developers/post/connecting-a-go-application-to-oracle-database)
Another usage example is the `go_ora`
example [here.](https://blogs.oracle.com/developers/post/connecting-a-go-application-to-oracle-database)

[alpha]:https://github.com/open-telemetry/opentelemetry-collector#alpha

[contrib]:https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
63 changes: 18 additions & 45 deletions receiver/sqlqueryreceiver/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-coll

import (
"context"
"database/sql"
"fmt"
"reflect"

// register db drivers
_ "github.com/SAP/go-hdb/driver"
Expand All @@ -27,76 +24,52 @@ import (
_ "github.com/lib/pq"
_ "github.com/sijms/go-ora/v2"
_ "github.com/snowflakedb/gosnowflake"
"go.uber.org/multierr"
"go.uber.org/zap"
)

type stringMap map[string]string

type dbClient interface {
metricRows(ctx context.Context) ([]metricRow, error)
metricRows(ctx context.Context) ([]stringMap, error)
}

type dbSQLClient struct {
db *sql.DB
db db
logger *zap.Logger
sql string
}

func newDbClient(db *sql.DB, sql string, logger *zap.Logger) dbClient {
func newDbClient(db db, sql string, logger *zap.Logger) dbClient {
return dbSQLClient{
db: db,
sql: sql,
logger: logger,
}
}

type metricRow map[string]string

func (cl dbSQLClient) metricRows(ctx context.Context) ([]metricRow, error) {
func (cl dbSQLClient) metricRows(ctx context.Context) ([]stringMap, error) {
sqlRows, err := cl.db.QueryContext(ctx, cl.sql)
if err != nil {
return nil, err
}
var out []metricRow
row := reusableRow{
attrs: map[string]func() string{},
}
types, err := sqlRows.ColumnTypes()
var out []stringMap
colTypes, err := sqlRows.ColumnTypes()
if err != nil {
return nil, err
}
for _, sqlType := range types {
colName := sqlType.Name()
var v interface{}
row.attrs[colName] = func() string {
format := "%v"
if reflect.TypeOf(v).Kind() == reflect.Slice {
// The Postgres driver returns a []uint8 (a string) for decimal and numeric types,
// which we want to render as strings. e.g. "4.1" instead of "[52, 46, 49]".
// Other slice types get the same treatment.
format = "%s"
}
return fmt.Sprintf(format, v)
}
row.scanDest = append(row.scanDest, &v)
}
scanner := newRowScanner(colTypes)
var warnings error
for sqlRows.Next() {
err = sqlRows.Scan(row.scanDest...)
err = scanner.scan(sqlRows)
if err != nil {
return nil, err
}
out = append(out, row.toMetricRow())
}
return out, nil
}

type reusableRow struct {
attrs map[string]func() string
scanDest []interface{}
}

func (row reusableRow) toMetricRow() metricRow {
out := metricRow{}
for k, f := range row.attrs {
out[k] = f()
sm, scanErr := scanner.toStringMap()
if scanErr != nil {
warnings = multierr.Append(warnings, scanErr)
}
out = append(out, sm)
}
return out
return out, warnings
}
149 changes: 145 additions & 4 deletions receiver/sqlqueryreceiver/db_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,160 @@

package sqlqueryreceiver

import "context"
import (
"context"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
"go.uber.org/zap"
)

func TestDBSQLClient_SingleRow(t *testing.T) {
cl := dbSQLClient{
db: fakeDB{rowVals: [][]any{{42, 123.4, "hello", true, []uint8{52, 46, 49}}}},
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
require.NoError(t, err)
assert.Len(t, rows, 1)
assert.EqualValues(t, map[string]string{
"col_0": "42",
"col_1": "123.4",
"col_2": "hello",
"col_3": "true",
"col_4": "4.1",
}, rows[0])
}

func TestDBSQLClient_MultiRow(t *testing.T) {
cl := dbSQLClient{
db: fakeDB{rowVals: [][]any{
{42, 123.4, "hello", true, []uint8{52, 46, 49}},
{43, 123.5, "goodbye", false, []uint8{52, 46, 50}},
}},
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
require.NoError(t, err)
assert.Len(t, rows, 2)
assert.EqualValues(t, map[string]string{
"col_0": "42",
"col_1": "123.4",
"col_2": "hello",
"col_3": "true",
"col_4": "4.1",
}, rows[0])
assert.EqualValues(t, map[string]string{
"col_0": "43",
"col_1": "123.5",
"col_2": "goodbye",
"col_3": "false",
"col_4": "4.2",
}, rows[1])
}

func TestDBSQLClient_Nulls(t *testing.T) {
cl := dbSQLClient{
db: fakeDB{rowVals: [][]any{
{42, nil, 111}, // NULLs from the DB map to nil here
}},
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
assert.Error(t, err)
assert.True(t, errors.Is(err, errNullValueWarning))
assert.Len(t, rows, 1)
assert.EqualValues(t, map[string]string{
"col_0": "42",
"col_2": "111",
}, rows[0])
}

func TestDBSQLClient_Nulls_MultiRow(t *testing.T) {
cl := dbSQLClient{
db: fakeDB{rowVals: [][]any{
{42, nil},
{43, nil},
}},
logger: zap.NewNop(),
sql: "",
}
rows, err := cl.metricRows(context.Background())
assert.Error(t, err)
errs := multierr.Errors(err)
for _, err := range errs {
assert.True(t, errors.Is(err, errNullValueWarning))
}
assert.Len(t, errs, 2)
assert.Len(t, rows, 2)
assert.EqualValues(t, map[string]string{
"col_0": "42",
}, rows[0])
assert.EqualValues(t, map[string]string{
"col_0": "43",
}, rows[1])
}

type fakeDB struct {
rowVals [][]any
}

func (db fakeDB) QueryContext(context.Context, string, ...any) (rows, error) {
return &fakeRows{vals: db.rowVals}, nil
}

type fakeRows struct {
vals [][]any
row int
}

func (r *fakeRows) ColumnTypes() ([]colType, error) {
var out []colType
for i := 0; i < len(r.vals[0]); i++ {
out = append(out, fakeCol{fmt.Sprintf("col_%d", i)})
}
return out, nil
}

func (r *fakeRows) Next() bool {
return r.row < len(r.vals)
}

func (r *fakeRows) Scan(dest ...any) error {
for i := range dest {
ptr := dest[i].(*any)
*ptr = r.vals[r.row][i]
}
r.row++
return nil
}

type fakeCol struct {
name string
}

func (c fakeCol) Name() string {
return c.name
}

type fakeDBClient struct {
requestCounter int
responses [][]metricRow
stringMaps [][]stringMap
err error
}

func (c *fakeDBClient) metricRows(context.Context) ([]metricRow, error) {
func (c *fakeDBClient) metricRows(context.Context) ([]stringMap, error) {
if c.err != nil {
return nil, c.err
}
idx := c.requestCounter
c.requestCounter++
return c.responses[idx], nil
return c.stringMaps[idx], nil
}
Loading

0 comments on commit 2a4bab1

Please sign in to comment.