Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: RFC for handling discarded events #14708

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Rework output disposition configuration
  • Loading branch information
bruceg committed Oct 7, 2022
commit 5037eb6ef3f72efeac6744981fbf5bf6ecbe5b50
40 changes: 27 additions & 13 deletions rfcs/2022-08-25-12217-handling-discarded-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,41 @@ outputs.errors.disposition = "reject"

#### Configuration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could have global configuration for the disposition as well? That might simplify it for simple use-cases. Like setting disposition = "drop" at the global level and have it apply to all components.

Could also do the same at the component level for those that already have multiple outputs in addition to letting them configure the disposition per-output.


The new options will show up in all of the enclosing configuration option structures:
The new `output.*.disposition` configuration mapping will be added to components as needed to
support configuring outputs. In order for the validation to access it, a new method will be added to
all component configuration traits:

```rust
struct SourceOuter {
trait SourceConfig: NamedComponent + Debug + Send + Sync {
/// Gets the list of outputs exposed by this source (existing).
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<config::Output>;

/// The configuration of all named outputs.
outputs: HashMap<String, OutputConfig>,
/// The actual user configuration of outputs.
/// Will be `None` if no such configuration is present on this component.
fn outputs_config(&self) -> Option<&OutputsConfig>;
JeanMertz marked this conversation as resolved.
Show resolved Hide resolved
}

struct TransformOuter {
trait TransformConfig: DescribeOutputs + NamedComponent + Debug + Send + Sync {
/// Gets the list of outputs exposed by this transform (existing).
fn outputs(&self, merged_definition: &schema::Definition) -> Vec<config::Output>;

/// The configuration of all named outputs.
outputs: HashMap<String, OutputConfig>,
/// The actual user configuration of outputs.
/// Will be `None` if no such configuration is present on this component.
fn outputs_config(&self) -> Option<&OutputsConfig>;
}

struct SinkOuter {
trait SinkConfig: DescribeOutputs + NamedComponent + Debug + Send + Sync {
/// Gets the list of outputs exposed by this sink.
fn outputs(&self) -> Vec<config::Output>;

/// The configuration of all named outputs.
outputs: HashMap<String, OutputConfig>,
/// The actual user configuration of outputs.
/// Will be `None` if no such configuration is present on this component.
fn outputs_config(&self) -> Option<&OutputsConfig>;
}

/// Configuration for a set of named outputs.
struct OutputsConfig(HashMap<String, OutputConfig>);

struct OutputConfig {
#[serde(default)]
disposition: OutputDisposition,
Expand Down Expand Up @@ -348,6 +359,9 @@ in isolation.
only when the output isn't consumed by another component? Do we need another disposition marker to
indicate discards are not to be counted as errors?

- Does the list of outputs exposed by the sink need the schema definition passed in like transforms
do?

## Plan Of Attack

Incremental steps to execute this change. These will be converted to issues after the RFC is approved:
Expand Down