Skip to content

Commit

Permalink
[pkg/stanza/csv_parser] Add ignore_quotes option (open-telemetry#13663
Browse files Browse the repository at this point in the history
)

* add ignore_quotes option to csv_parser

* error if both `lazy_quotes` and `ignore_quotes` are true
  • Loading branch information
BinaryFissionGames committed Aug 30, 2022
1 parent eeeb85e commit 2ca5bb8
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 32 deletions.
27 changes: 14 additions & 13 deletions pkg/stanza/docs/operators/csv_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ The `csv_parser` operator parses the string-type field selected by `parse_from`

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `csv_parser` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `header` | required when `header_attribute` not set | A string of delimited field names |
| `header_attribute` | required when `header` not set | An attribute name to read the header field from, to support dynamic field names |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter. |
| `lazy_quotes` | `false` | If true, a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field. |
| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. |
| `parse_to` | `body` | The [field](../types/field.md) to which the value will be parsed. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
| `timestamp` | `nil` | An optional [timestamp](../types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. |
| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. |
| Field | Default | Description |
|--------------------|------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
| `id` | `csv_parser` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `header` | required when `header_attribute` not set | A string of delimited field names |
| `header_attribute` | required when `header` not set | An attribute name to read the header field from, to support dynamic field names |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter. |
| `lazy_quotes` | `false` | If true, a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field. Cannot be true if `ignore_quotes` is true. |
| `ignore_quotes` | `false` | If true, all quotes are ignored, and fields are simply split on the delimiter. Cannot be true if `lazy_quotes` is true. |
| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. |
| `parse_to` | `body` | The [field](../types/field.md) to which the value will be parsed. |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). |
| `timestamp` | `nil` | An optional [timestamp](../types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. |
| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. |

### Embedded Operations

Expand Down
10 changes: 10 additions & 0 deletions pkg/stanza/operator/parser/csv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func TestConfig(t *testing.T) {
return p
}(),
},
{
Name: "ignore_quotes",
Expect: func() *Config {
p := NewConfig()
p.Header = "id,severity,message"
p.IgnoreQuotes = true
p.ParseFrom = entry.NewBodyField("message")
return p
}(),
},
{
Name: "delimiter",
Expect: func() *Config {
Expand Down
81 changes: 62 additions & 19 deletions pkg/stanza/operator/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Config struct {
HeaderAttribute string `mapstructure:"header_attribute" json:"header_attribute" yaml:"header_attribute"`
FieldDelimiter string `mapstructure:"delimiter" json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
LazyQuotes bool `mapstructure:"lazy_quotes" json:"lazy_quotes,omitempty" yaml:"lazy_quotes,omitempty"`
IgnoreQuotes bool `mapstructure:"ignore_quotes" json:"ignore_quotes,omitempty" yaml:"ignore_quotes,omitempty"`
}

// Build will build a csv parser operator.
Expand All @@ -67,6 +68,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
c.FieldDelimiter = ","
}

if c.IgnoreQuotes && c.LazyQuotes {
return nil, errors.New("only one of 'ignore_quotes' or 'lazy_quotes' can be true")
}

fieldDelimiter := []rune(c.FieldDelimiter)[0]

if len([]rune(c.FieldDelimiter)) != 1 {
Expand All @@ -91,8 +96,8 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
headerAttribute: c.HeaderAttribute,
fieldDelimiter: fieldDelimiter,
lazyQuotes: c.LazyQuotes,

parse: generateParseFunc(headers, fieldDelimiter, c.LazyQuotes),
ignoreQuotes: c.IgnoreQuotes,
parse: generateParseFunc(headers, fieldDelimiter, c.LazyQuotes, c.IgnoreQuotes),
}, nil
}

Expand All @@ -103,6 +108,7 @@ type Parser struct {
header []string
headerAttribute string
lazyQuotes bool
ignoreQuotes bool
parse parseFunc
}

Expand All @@ -127,7 +133,7 @@ func (r *Parser) Process(ctx context.Context, e *entry.Entry) error {
return err
}
headers := strings.Split(headerString, string([]rune{r.fieldDelimiter}))
parse = generateParseFunc(headers, r.fieldDelimiter, r.lazyQuotes)
parse = generateParseFunc(headers, r.fieldDelimiter, r.lazyQuotes, r.ignoreQuotes)
}

return r.ParserOperator.ProcessWith(ctx, e, parse)
Expand All @@ -136,16 +142,19 @@ func (r *Parser) Process(ctx context.Context, e *entry.Entry) error {
// generateParseFunc returns a parse function for a given header, allowing
// each entry to have a potentially unique set of fields when using dynamic
// field names retrieved from an entry's attribute
func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) parseFunc {
func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool, ignoreQuotes bool) parseFunc {
if ignoreQuotes {
return generateSplitParseFunc(headers, fieldDelimiter)
}
return generateCSVParseFunc(headers, fieldDelimiter, lazyQuotes)
}

// generateCSVParseFunc returns a parse function for a given header and field delimiter, which parses a line of CSV text.
func generateCSVParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) parseFunc {
return func(value interface{}) (interface{}, error) {
var csvLine string
switch t := value.(type) {
case string:
csvLine += t
case []byte:
csvLine += string(t)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
csvLine, err := valueAsString(value)
if err != nil {
return nil, err
}

reader := csvparser.NewReader(strings.NewReader(csvLine))
Expand Down Expand Up @@ -193,15 +202,49 @@ func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) p
}
}

parsedValues := make(map[string]interface{})
return headersMap(headers, joinedLine)
}
}

if len(joinedLine) != len(headers) {
return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(joinedLine))
// generateSplitParseFunc returns a parse function (which ignores quotes) for a given header and field delimiter.
func generateSplitParseFunc(headers []string, fieldDelimiter rune) parseFunc {
return func(value interface{}) (interface{}, error) {
csvLine, err := valueAsString(value)
if err != nil {
return nil, err
}

for i, val := range joinedLine {
parsedValues[headers[i]] = val
}
return parsedValues, nil
// This parse function does not do any special quote handling; Splitting on the delimiter is sufficient.
fields := strings.Split(csvLine, string(fieldDelimiter))
return headersMap(headers, fields)
}
}

// valueAsString interprets the given value as a string.
func valueAsString(value interface{}) (string, error) {
var s string
switch t := value.(type) {
case string:
s += t
case []byte:
s += string(t)
default:
return s, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}

return s, nil
}

// headersMap creates a map of headers[i] -> fields[i].
func headersMap(headers []string, fields []string) (map[string]interface{}, error) {
parsedValues := make(map[string]interface{})

if len(fields) != len(headers) {
return nil, fmt.Errorf("wrong number of fields: expected %d, found %d", len(headers), len(fields))
}

for i, val := range fields {
parsedValues[headers[i]] = val
}
return parsedValues, nil
}
104 changes: 104 additions & 0 deletions pkg/stanza/operator/parser/csv/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func newTestParser(t *testing.T) *Parser {
return op.(*Parser)
}

func newTestParserIgnoreQuotes(t *testing.T) *Parser {
cfg := NewConfigWithID("test")
cfg.Header = testHeader
cfg.IgnoreQuotes = true
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)
return op.(*Parser)
}

func TestParserBuildFailure(t *testing.T) {
cfg := NewConfigWithID("test")
cfg.OnError = "invalid_on_error"
Expand All @@ -43,6 +52,16 @@ func TestParserBuildFailure(t *testing.T) {
require.Contains(t, err.Error(), "invalid `on_error` field")
}

func TestParserBuildFailureLazyIgnoreQuotes(t *testing.T) {
cfg := NewConfigWithID("test")
cfg.Header = testHeader
cfg.LazyQuotes = true
cfg.IgnoreQuotes = true
_, err := cfg.Build(testutil.Logger(t))
require.Error(t, err)
require.ErrorContains(t, err, "only one of 'ignore_quotes' or 'lazy_quotes' can be true")
}

func TestParserBuildFailureInvalidDelimiter(t *testing.T) {
cfg := NewConfigWithID("test")
cfg.Header = testHeader
Expand Down Expand Up @@ -82,6 +101,13 @@ func TestParserInvalidType(t *testing.T) {
require.Contains(t, err.Error(), "type '[]int' cannot be parsed as csv")
}

func TestParserInvalidTypeIgnoreQuotes(t *testing.T) {
parser := newTestParserIgnoreQuotes(t)
_, err := parser.parse([]int{})
require.Error(t, err)
require.Contains(t, err.Error(), "type '[]int' cannot be parsed as csv")
}

func TestParserCSV(t *testing.T) {
cases := []struct {
name string
Expand Down Expand Up @@ -611,6 +637,84 @@ func TestParserCSV(t *testing.T) {
false,
false,
},
{
"parse-with-ignore-quotes",
func(p *Config) {
p.Header = "name,age,height,number"
p.FieldDelimiter = ","
p.IgnoreQuotes = true
},
[]entry.Entry{
{
Body: "stanza log parser,1,6ft,5",
},
},
[]entry.Entry{
{
Attributes: map[string]interface{}{
"name": "stanza log parser",
"age": "1",
"height": "6ft",
"number": "5",
},
Body: "stanza log parser,1,6ft,5",
},
},
false,
false,
},
{
"parse-with-ignore-quotes-bytes",
func(p *Config) {
p.Header = "name,age,height,number"
p.FieldDelimiter = ","
p.IgnoreQuotes = true
},
[]entry.Entry{
{
Body: []byte("stanza log parser,1,6ft,5"),
},
},
[]entry.Entry{
{
Attributes: map[string]interface{}{
"name": "stanza log parser",
"age": "1",
"height": "6ft",
"number": "5",
},
Body: []byte("stanza log parser,1,6ft,5"),
},
},
false,
false,
},
{
"parse-with-ignore-quotes-invalid-csv",
func(p *Config) {
p.Header = "name,age,height,number"
p.FieldDelimiter = ","
p.IgnoreQuotes = true
},
[]entry.Entry{
{
Body: "stanza log parser,\"1,\"6ft,5\"",
},
},
[]entry.Entry{
{
Attributes: map[string]interface{}{
"name": "stanza log parser",
"age": "\"1",
"height": "\"6ft",
"number": "5\"",
},
Body: "stanza log parser,\"1,\"6ft,5\"",
},
},
false,
false,
},
}

for _, tc := range cases {
Expand Down
5 changes: 5 additions & 0 deletions pkg/stanza/operator/parser/csv/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ lazy_quotes:
parse_from: body.message
header: id,severity,message
lazy_quotes: true
ignore_quotes:
type: csv_parser
parse_from: body.message
header: id,severity,message
ignore_quotes: true
timestamp:
type: csv_parser
header: timestamp_field,severity,message
Expand Down
11 changes: 11 additions & 0 deletions unreleased/ignore-quotes-csv.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza/csv_parser

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `ignore_quotes` option that ignores all quoting in fields if true.

# One or more tracking issues related to the change
issues: [13656]

0 comments on commit 2ca5bb8

Please sign in to comment.