Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an Ockam Kafka Input and an Ockam Kafka Output #2616

Closed

Conversation

mrinalwadhwa
Copy link
Contributor

@mrinalwadhwa mrinalwadhwa commented May 29, 2024

Everyone on the Ockam team is thrilled to contribute two new Ockam Kafka plugins - an input and an output.

Ockam Kafka plugins for Redpanda Connect create Ockam Portals to send end-to-end encrypted messages through Kafka. These plugins combine Ockam and the Franz Kafka plugins to setup encrypted portals.

Messages from a producer are encrypted to a specific Consumer. Only that specific Consumer can decrypt these messages. This guarantees that your data cannot be observed or tampered as it passes through Kafka. Operators of the Kafka cluster only see end-to-end encrypted data. Any compromise of an operator's infrastructure cannot compromise your business data. Kafka brokers cannot see the data.

Examples

On one machine

Let's begin with a very simple local example that runs all the parts on one machine.

Consumer

The consumer uses ockam_kafka input to decrypt and read messages from Kafka. It then transforms messages using a processor and writes the transformed message to stdout.

# consumer.yaml

input:
  ockam_kafka:
    seed_brokers: [localhost:9092]
    topics: [topic_A]
    consumer_group: example_group

pipeline:
  processors:
    - bloblang: |
        root.doc = this | content().string()
        root.length = content().length()
        root.topic = meta("kafka_topic")

output:
  stdout: {}

Producer

The producer reads messages from stdin. It then uses ockam_kafka output to encrypt and write messages to Kafka.

# producer.yaml

input:
  stdin: {}

output:
  ockam_kafka:
    seed_brokers: [localhost:9092]
    topic: topic_A

Run

With a redpanda-connect binary compiled using this pull request:

# Setup Redpanda
brew install redpanda-data/tap/redpanda
rpk container start

# Start a Consumer
redpanda-connect -c ./consumer.yaml

# Start Producers in separate terminals and type some messages.
redpanda-connect -c ./producer.yaml --set "http.address=127.0.0.1:4196"
redpanda-connect -c ./producer.yaml --set "http.address=127.0.0.1:4197"
redpanda-connect -c ./producer.yaml --set "http.address=127.0.0.1:4198"

# See encrypted messages in Redpanda at http:https://localhost:8080

Between two machines

Consumer

# consumer.yaml

input:
  ockam_kafka:
    seed_brokers: [${IP_OF_REDPANDA_MACHINE}:9092]
    topics: [topic_A]
    consumer_group: example_group
    ockam_node_address: 0.0.0.0:6262
    ockam_allow_producer: ${OCKAM_ALLOW_PRODUCER}

pipeline:
  processors:
    - bloblang: |
        root.doc = this | content().string()
        root.length = content().length()
        root.topic = meta("kafka_topic")

output:
  stdout: {}

Producer

# producer.yaml

input:
  stdin: {}

output:
  ockam_kafka:
    seed_brokers: [${IP_OF_REDPANDA_MACHINE}:9092]
    topic: topic_A
    ockam_route_to_consumer: /ip4/${IP_OF_CONSUMER_MACHINE}/tcp/6262
    ockam_allow_consumer: ${OCKAM_ALLOW_CONSUMER}

Run

With a redpanda-connect binary compiled using this pull request:

# Redpanda
brew install redpanda-data/tap/redpanda
rpk container start

# Machine 1 - Producer
curl --proto '=https' --tlsv1.2 -sSfL https://install.command.ockam.io | bash && source "$HOME/.ockam/env"
ockam identity create
# copy the generated Identity's identifier to Machine 2 - I5fc86d2...

# Machine 2 - Consumer
curl --proto '=https' --tlsv1.2 -sSfL https://install.command.ockam.io | bash && source "$HOME/.ockam/env"
ockam identity create
IP_OF_REDPANDA_MACHINE=... OCKAM_ALLOW_PRODUCER=I5fc86d2... redpanda-connect -c ./consumer.yaml
# copy the generated Identity's identifier to Machine 1 - I94d64b91...

# Machine 1 - Producer
IP_OF_REDPANDA_MACHINE=... IP_OF_CONSUMER_MACHINE=... \
  OCKAM_ALLOW_CONSUMER=I94d64b91... redpanda-connect -c ./producer.yaml

Across several machines

Consumer

# consumer.yaml

input:
  ockam_kafka:
    seed_brokers: [rp-node-0:9092]
    topics: [topic_A]
    consumer_group: example_group
    ockam_allow_producer: producer
    ockam_relay: consumer_relay
    ockam_enrollment_ticket: ${OCKAM_ENROLLMENT_TICKET}

pipeline:
  processors:
    - bloblang: |
        root = this
        root.data.message = this.data.message.uppercase()

output:
  stdout: {}

Producer

# producer.yaml

input:
  generate:
    count: 1000
    interval: "@every 1s"
    mapping: |
      root = {
        "_producer": hostname(),
        "data": { "email": fake("email"), "message": fake("sentence") }
      }

output:
  ockam_kafka:
    seed_brokers: [rp-node-0:9092]
    topic: topic_A
    ockam_route_to_consumer: /project/default/service/forward_to_consumer_relay/secure/api
    ockam_allow_consumer: consumer
    ockam_enrollment_ticket: ${OCKAM_ENROLLMENT_TICKET}

Run

As a temporary step, until this pull request is merged, we compiled this PR's branch and pushed a docker image to ghcr.io/build-trust/redpanda-connect

# Setup Redpanda
brew install redpanda-data/tap/redpanda
rpk container start

# Setup Ockam
brew install build-trust/ockam/ockam
ockam enroll

# Setup Consumer
ockam project ticket --usage-count 1 --expires-in 10m --attribute consumer --relay consumer_relay > ticket
docker run --rm --network redpanda --name consumer -v "$(pwd)/consumer.yaml:/connect.yaml" \
  -e OCKAM_ENROLLMENT_TICKET="$(cat ticket)" ghcr.io/build-trust/redpanda-connect

# Setup Producers (in a new terminal window)
for i in $(seq 1 3); do
    ockam project ticket --usage-count 1 --expires-in 10m --attribute producer > ticket
    docker run --rm -d --network redpanda --name "producer$i" -v "$(pwd)/producer.yaml:/connect.yaml" \
        -e OCKAM_ENROLLMENT_TICKET="$(cat ./ticket)" ghcr.io/build-trust/redpanda-connect
done

# It can take a minute or so for the messages to start flowing.
# Observe that the above consumer can decrypt and transform messages.

# However, if you look at the messages inside redpanda console at localhost:8080
# or using the below rpk command, you'll notice that the messages are encrypted.
rpk topic consume topic_A

# Cleanup
docker stop $(docker ps -q --filter "name=producer" --filter "name=consumer")
rpk container purge

Across several machines - using Redpanda Serverless

Consumer

#consumer.yaml

input:
  ockam_kafka:
    seed_brokers: ["${REDPANDA_BOOTSTRAP_SERVER}"]
    topics: [topic_A]
    consumer_group: example_group
    tls:
      enabled: true
    sasl:
      - mechanism: "SCRAM-SHA-256"
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"
    ockam_allow_producer: producer
    ockam_relay: consumer_relay
    ockam_enrollment_ticket: "${OCKAM_ENROLLMENT_TICKET}"

pipeline:
  processors:
    - bloblang: |
        root = this
        root.data.message = this.data.message.uppercase()

output:
  stdout: {}

Producers

# producer.yaml

input:
  generate:
    count: 1000
    interval: "@every 1s"
    mapping: |
      root = {
        "_producer": hostname(),
        "data": { "email": fake("email"), "message": fake("sentence") }
      }

output:
  ockam_kafka:
    seed_brokers: ["${REDPANDA_BOOTSTRAP_SERVER}"]
    topic: topic_A
    sasl:
      - mechanism: "SCRAM-SHA-256"
        username: "${REDPANDA_USERNAME}"
        password: "${REDPANDA_PASSWORD}"
    tls:
      enabled: true
    ockam_route_to_consumer: /project/default/service/forward_to_consumer_relay/secure/api
    ockam_allow_consumer: consumer
    ockam_enrollment_ticket: ${OCKAM_ENROLLMENT_TICKET}

Run

# Setup Redpanda
#   1. Create a Redpanda serverless cluster - https://cloud.redpanda.com/
#   2. Create a new topic with name topic_A with default settings.
#   3. Create a new user with name tester.
#       - Select SCRAM-SHA-256 as SASL Mechanism and copy the password to use as environment variable below.
#       - Navigate to Security->ACLs, select tester, select "Allow all operations"
#   4. Copy the following values to your shell's and set them as environment variables:
export REDPANDA_BOOTSTRAP_SERVER="TODO:9092"
export REDPANDA_USERNAME="tester"
export REDPANDA_PASSWORD="TODO"

# Setup Ockam
#   1. Signup for Ockam https://ockam.io/signup
#   2. Setup Ockam Command on your development machine
brew install build-trust/ockam/ockam
ockam enroll

# Setup Consumer
ockam project ticket --usage-count 1 --expires-in 10m --attribute consumer --relay consumer_relay > ticket
docker run --rm --name consumer \
  -v "$(pwd)/consumer.yaml:/connect.yaml" \
  -e OCKAM_ENROLLMENT_TICKET="$(cat ticket)" \
  -e REDPANDA_BOOTSTRAP_SERVER="$REDPANDA_BOOTSTRAP_SERVER" \
  -e REDPANDA_USERNAME="$REDPANDA_USERNAME" \
  -e REDPANDA_PASSWORD="$REDPANDA_PASSWORD" \
  ghcr.io/build-trust/redpanda-connect

# Setup Producers (in a new terminal window)
for i in $(seq 1 3); do
    ockam project ticket --usage-count 1 --expires-in 10m --attribute producer > ticket
    docker run --rm -d --name "producer$i" \
        -v "$(pwd)/producer.yaml:/connect.yaml" \
        -e OCKAM_ENROLLMENT_TICKET="$(cat ./ticket)" \
        -e REDPANDA_BOOTSTRAP_SERVER="$REDPANDA_BOOTSTRAP_SERVER" \
        -e REDPANDA_USERNAME="$REDPANDA_USERNAME" \
        -e REDPANDA_PASSWORD="$REDPANDA_PASSWORD" \
        ghcr.io/build-trust/redpanda-connect
done

# It can take a minute or so for the messages to start flowing.
# Observe that the above consumer can decrypt and transform messages.

# However, if you look at the messages inside redpanda console or using
# the below rpk command, you'll notice that the messages are encrypted.
rpk cloud login
rpk topic consume topic_A

# Cleanup
docker stop $(docker ps -q --filter "name=producer" --filter "name=consumer")

@mrinalwadhwa mrinalwadhwa marked this pull request as draft May 29, 2024 02:12
Comment on lines 27 to 61
func ockamKafkaInputConfig() *service.ConfigSpec {
return kafka.FranzKafkaInputConfig().
Summary(`Ockam`).
Field(service.NewStringField("ockam_identity_name").Optional()).
Field(service.NewStringField("ockam_node_address").Default("127.0.0.1:6262")).
Field(service.NewStringField("ockam_allow_producer").Default("self")).
Field(service.NewStringField("ockam_enrollment_ticket").Optional()).
Field(service.NewStringField("ockam_relay").Optional())
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push some documentation for these tomorrow with some examples on how to use them.

Comment on lines 38 to 68
Field(service.NewStringField("ockam_enrollment_ticket").Optional()).
Field(service.NewStringField("ockam_identity_name").Optional()).
Field(service.NewStringField("ockam_allow_consumer").Default("self")).
Field(service.NewStringField("ockam_route_to_consumer").Default("/ip4/127.0.0.1/tcp/6262"))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push some documentation for these tomorrow with some examples on how to use them.

@CLAassistant
Copy link

CLAassistant commented May 30, 2024

CLA assistant check
All committers have signed the CLA.

@mrinalwadhwa mrinalwadhwa force-pushed the mrinal/ockam_plugins branch 3 times, most recently from f5fabc0 to 969e043 Compare May 30, 2024 14:28
@mrinalwadhwa mrinalwadhwa marked this pull request as ready for review May 30, 2024 15:11
@Jeffail Jeffail requested a review from asimms41 July 23, 2024 07:23
Copy link
Collaborator

@Jeffail Jeffail left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, left a couple of comments. Is this being distributed as an Apache V2 licensed plugin? If so we need headers added to each file. There's a handy script for it add_license_headers.sh, if you run that from the repository root directory it should add them automatically.

Example([]string{"foo:9092", "bar:9092"}).
Example([]string{"foo:9092,bar:9092"})).
Field(service.NewBoolField("disable_content_encryption").Default(false).Optional()).
Field(service.NewStringField("ockam_identity_name").Optional()).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the ockam_ prefix of these fields? If there's a need to distinguish between ockam and kafka specific fields I'd say it's more reasonable to namespace the kafka fields under than object like this:

ockam_kafka:
  node_address: TODO
  allow_producer: self
  kafka:
    seed_brokers: ...

You can do it with service.NewObjectField.

Example([]string{"localhost:9092"}).
Example([]string{"foo:9092", "bar:9092"}).
Example([]string{"foo:9092,bar:9092"})).
Field(service.NewBoolField("disable_content_encryption").Default(false).Optional()).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using both Default and Optional is redundant, as a field with a default is always specified. Remove the Optional call if the default value is valid. This goes for all fields.


if routeToKafkaOutlet == "self" {
// TODO: Handle other tls fields in kafka franz
_, tls, err := conf.FieldTLSToggled("tls")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks as though the tls field is never defined above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now explicitly defined within the "kafka" namespace

if err != nil {
return nil, err
}
if len(seedBrokers) > 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(seedBrokers) > 1 {
if len(seedBrokers) != 1 {

Looks as though exactly one broker is needed here.

mrinalwadhwa and others added 4 commits July 26, 2024 13:10
Co-authored-by: Mrinal Wadhwa <[email protected]>
Co-authored-by: Adrian Benavides <[email protected]>
Co-authored-by: Pablo Polvorin <[email protected]>
Co-authored-by: etorreborre <[email protected]>

Signed-off-by: Mrinal Wadhwa <[email protected]>
Co-authored-by: Mrinal Wadhwa <[email protected]>
Co-authored-by: Adrian Benavides <[email protected]>
Co-authored-by: Pablo Polvorin <[email protected]>
Co-authored-by: etorreborre <[email protected]>

Signed-off-by: Mrinal Wadhwa <[email protected]>
@davide-baldo
Copy link
Contributor

I rebased and applied your feedback.
I had to modify franz kafka a little bit to better expose the fields.

@Jeffail
Copy link
Collaborator

Jeffail commented Aug 1, 2024

Merged via #2755

@Jeffail Jeffail closed this Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants