-
Notifications
You must be signed in to change notification settings - Fork 803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an Ockam Kafka Input and an Ockam Kafka Output #2616
Conversation
internal/impl/ockam/input_kafka.go
Outdated
func ockamKafkaInputConfig() *service.ConfigSpec { | ||
return kafka.FranzKafkaInputConfig(). | ||
Summary(`Ockam`). | ||
Field(service.NewStringField("ockam_identity_name").Optional()). | ||
Field(service.NewStringField("ockam_node_address").Default("127.0.0.1:6262")). | ||
Field(service.NewStringField("ockam_allow_producer").Default("self")). | ||
Field(service.NewStringField("ockam_enrollment_ticket").Optional()). | ||
Field(service.NewStringField("ockam_relay").Optional()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll push some documentation for these tomorrow with some examples on how to use them.
internal/impl/ockam/output_kafka.go
Outdated
Field(service.NewStringField("ockam_enrollment_ticket").Optional()). | ||
Field(service.NewStringField("ockam_identity_name").Optional()). | ||
Field(service.NewStringField("ockam_allow_consumer").Default("self")). | ||
Field(service.NewStringField("ockam_route_to_consumer").Default("/ip4/127.0.0.1/tcp/6262")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll push some documentation for these tomorrow with some examples on how to use them.
f5fabc0
to
969e043
Compare
81cb85b
to
ee41dd8
Compare
ee41dd8
to
b26b136
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, left a couple of comments. Is this being distributed as an Apache V2 licensed plugin? If so we need headers added to each file. There's a handy script for it add_license_headers.sh, if you run that from the repository root directory it should add them automatically.
internal/impl/ockam/input_kafka.go
Outdated
Example([]string{"foo:9092", "bar:9092"}). | ||
Example([]string{"foo:9092,bar:9092"})). | ||
Field(service.NewBoolField("disable_content_encryption").Default(false).Optional()). | ||
Field(service.NewStringField("ockam_identity_name").Optional()). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the ockam_
prefix of these fields? If there's a need to distinguish between ockam and kafka specific fields I'd say it's more reasonable to namespace the kafka fields under than object like this:
ockam_kafka:
node_address: TODO
allow_producer: self
kafka:
seed_brokers: ...
You can do it with service.NewObjectField
.
internal/impl/ockam/input_kafka.go
Outdated
Example([]string{"localhost:9092"}). | ||
Example([]string{"foo:9092", "bar:9092"}). | ||
Example([]string{"foo:9092,bar:9092"})). | ||
Field(service.NewBoolField("disable_content_encryption").Default(false).Optional()). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using both Default
and Optional
is redundant, as a field with a default is always specified. Remove the Optional
call if the default value is valid. This goes for all fields.
internal/impl/ockam/input_kafka.go
Outdated
|
||
if routeToKafkaOutlet == "self" { | ||
// TODO: Handle other tls fields in kafka franz | ||
_, tls, err := conf.FieldTLSToggled("tls") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks as though the tls
field is never defined above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now explicitly defined within the "kafka" namespace
internal/impl/ockam/input_kafka.go
Outdated
if err != nil { | ||
return nil, err | ||
} | ||
if len(seedBrokers) > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(seedBrokers) > 1 { | |
if len(seedBrokers) != 1 { |
Looks as though exactly one broker is needed here.
Co-authored-by: Mrinal Wadhwa <[email protected]> Co-authored-by: Adrian Benavides <[email protected]> Co-authored-by: Pablo Polvorin <[email protected]> Co-authored-by: etorreborre <[email protected]> Signed-off-by: Mrinal Wadhwa <[email protected]>
Co-authored-by: Mrinal Wadhwa <[email protected]> Co-authored-by: Adrian Benavides <[email protected]> Co-authored-by: Pablo Polvorin <[email protected]> Co-authored-by: etorreborre <[email protected]> Signed-off-by: Mrinal Wadhwa <[email protected]>
a9eda6f
to
a99123e
Compare
I rebased and applied your feedback. |
Merged via #2755 |
Everyone on the Ockam team is thrilled to contribute two new Ockam Kafka plugins - an input and an output.
Ockam Kafka plugins for Redpanda Connect create Ockam Portals to send end-to-end encrypted messages through Kafka. These plugins combine Ockam and the Franz Kafka plugins to setup encrypted portals.
Messages from a producer are encrypted to a specific Consumer. Only that specific Consumer can decrypt these messages. This guarantees that your data cannot be observed or tampered as it passes through Kafka. Operators of the Kafka cluster only see end-to-end encrypted data. Any compromise of an operator's infrastructure cannot compromise your business data. Kafka brokers cannot see the data.
Examples
On one machine
Let's begin with a very simple local example that runs all the parts on one machine.
Consumer
The consumer uses
ockam_kafka
input to decrypt and read messages from Kafka. It then transforms messages using a processor and writes the transformed message to stdout.Producer
The producer reads messages from stdin. It then uses
ockam_kafka
output to encrypt and write messages to Kafka.Run
With a
redpanda-connect
binary compiled using this pull request:Between two machines
Consumer
Producer
Run
With a
redpanda-connect
binary compiled using this pull request:Across several machines
Consumer
Producer
Run
As a temporary step, until this pull request is merged, we compiled this PR's branch and pushed a docker image to
ghcr.io/build-trust/redpanda-connect
Across several machines - using Redpanda Serverless
Consumer
Producers
Run