Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to filter the array of fields collected from Kafka Receiver in Body(Map) format? #28816

Closed
ericashi opened this issue Oct 31, 2023 · 6 comments
Labels
processor/transform Transform processor question Further information is requested receiver/kafka

Comments

@ericashi
Copy link

Component(s)

processor/filter, receiver/kafka

What happened?

Description

I am consuming data from a Kafka Server using kafkareceiver. Currently, the consumed data is encoded in json logs format. The array of fields is in the "body" field in the logs displayed. Please refer to the "Log output" section for the logs displayed.

Currently, I wanted to filter the fields in the "body" field. For instance, there are 10 fields consumed for a topic and we need only 5 fields from the topic consumed.

I have tried using "resource_attributes" and also "bodies" configuration from "filterprocessor" but both are not working.

May I know how to filter the fields from the "body" fields, please?

Steps to Reproduce

Setup a config.yaml file with needed configurations and run the otelcol-contrib-collector binary.
Please refer to the "OpenTelemetry Collector configuration" section for the configurations.

Expected Result

I expected the un-needed fields to be filtered out.

Actual Result

No fields are being filtered out and will always store all the fields from the topic.

Collector version

v0.88.0

Environment information

Environment

OS: x86-64_linux_4.12_ImageSLES12SP5

OpenTelemetry Collector configuration

receivers:
  kafka:
    protocol_version: 2.0.0
    topic: <topic_name>
    brokers:
      - "<broker_link>"
    group_id: <group_id_name>
    encoding: json

processors:
  filter:
    logs:
      include:
        match_type: regexp
        resource_attributes:
          - key: field_b
            value: prefix.*

  # filter:
  #   logs:
  #     include:
  #       match_type: regexp
  #       bodies:
  #         - field_b

exporters:
  splunk_hec:
    token: <HEC_Token>
    endpoint: <SPLUNK_Endpoint>
    sourcetype: <sourcetype_name>

  logging:
    verbosity: detailed

service:
  telemetry:
    metrics:
      level: detailed
      address: 0.0.0.0:8885

  pipelines:
    logs:
      receivers: [kafka]
      processors: [filter]
      exporters: [logging, splunk_hec]

Log output

2023-10-30T19:55:11.292-0700    info    ResourceLog #0
Resource SchemaURL: 
ScopeLogs #0
ScopeLogs SchemaURL: 
InstrumentationScope  
LogRecord #0
ObservedTimestamp: 2023-10-31 02:55:11.29196124 +0000 UTC
Timestamp: 1970-01-01 00:00:00 +0000 UTC
SeverityText: 
SeverityNumber: Unspecified(0)
Body: Map({"field_a":"21.8492083460968","field_b":"abc","field_c":"0.5555555","field_d":"63.4123456","field_e":"0","field_f":"0.0434407566318657","field_g":"138.95"})
Trace ID: 
Span ID: 
Flags: 0
        {"kind": "exporter", "data_type": "logs", "name": "logging"}

Additional context

May I know how can I filter out un-needed fields and only store the needed fields, please?

@ericashi ericashi added bug Something isn't working needs triage New item requiring triage labels Oct 31, 2023
@github-actions
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@ericashi
Copy link
Author

/label help-wanted

@github-actions github-actions bot added the help wanted Extra attention is needed label Oct 31, 2023
@crobert-1
Copy link
Member

crobert-1 commented Oct 31, 2023

Hello @ericashi, I believe the attributes processor is the best option here. You can use the delete action. Here's an example that might be able to point you in the right direction.

Disregard based on @TylerHelmuth's comment below.

@crobert-1 crobert-1 added question Further information is requested processor/attributes Attributes processor and removed bug Something isn't working help wanted Extra attention is needed processor/filter Filter processor needs triage New item requiring triage labels Oct 31, 2023
Copy link
Contributor

Pinging code owners for processor/attributes: @boostchicken. See Adding Labels via Comments if you do not have permissions to add labels yourself.

@TylerHelmuth TylerHelmuth added processor/transform Transform processor and removed processor/attributes Attributes processor labels Oct 31, 2023
@TylerHelmuth
Copy link
Member

@ericashi the fields you want to remove from the log payload are in the body of the load, is that correct? If so, you can do this via the transformprocessor only. It would look like:

transform:
  error_mode: ignore
  log_statements:
    - context: log
      statements:
        - delete_key(body, "field_b") 

@ericashi
Copy link
Author

ericashi commented Nov 1, 2023

Hi @TylerHelmuth ,

Thanks for the solution provided. It is working now.

Thank you for the help.

Best Regards,
Erica Ooi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
processor/transform Transform processor question Further information is requested receiver/kafka
Projects
None yet
Development

No branches or pull requests

3 participants