Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline): transform support on_failure #4123

Merged
merged 61 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
63491d9
chore: add log http ingester scaffold
paomian May 22, 2024
4d2ec3b
chore: add some example code
paomian May 22, 2024
2e51c16
chore: add log inserter
paomian May 27, 2024
fbc66ec
chore: add log handler file
paomian May 27, 2024
cd4d83d
chore: add pipeline lib
paomian May 27, 2024
2bc1937
chore: import log handler
paomian May 29, 2024
2b16ef9
chore: add pipelime http handler
paomian May 29, 2024
f1350cd
chore: add pipeline private table
paomian May 30, 2024
1d52cad
chore: add pipeline API
paomian May 31, 2024
8c69abb
chore: improve error handling
paomian May 31, 2024
7e0a9ad
Merge branch 'main' into feat/log-handler
shuiyisong Jun 3, 2024
73432dc
chore: merge main
shuiyisong Jun 3, 2024
9d7284c
Merge pull request #6 from shuiyisong/chore/merge_main
paomian Jun 3, 2024
1a03b7e
chore: add multi content type support for log handler
paomian Jun 3, 2024
a2f1230
Merge branch 'main' into feat/log-handler
shuiyisong Jun 4, 2024
6a0998d
refactor: remove servers dep on pipeline
shuiyisong Jun 3, 2024
443eaf9
refactor: move define_into_tonic_status to common-error
shuiyisong Jun 3, 2024
c8ce4ee
refactor: bring in pipeline 3eb890c551b8d7f60c4491fcfec18966e2b210a4
shuiyisong Jun 4, 2024
eb9cd22
chore: fix typo
shuiyisong Jun 4, 2024
8d0595c
refactor: bring in pipeline a95c9767d7056ab01dd8ca5fa1214456c6ffc72c
shuiyisong Jun 4, 2024
061b14e
chore: fix typo and license header
shuiyisong Jun 4, 2024
c152472
refactor: move http event handler to a separate file
shuiyisong Jun 4, 2024
ddea3c1
chore: add test for pipeline
paomian Jun 4, 2024
162e92f
Merge branch 'main' into feat/log-handler
shuiyisong Jun 4, 2024
5a7a5be
chore: update
shuiyisong Jun 4, 2024
423e51e
chore: fmt
shuiyisong Jun 4, 2024
51df233
Merge pull request #7 from shuiyisong/refactor/log_handler
paomian Jun 4, 2024
8066eb3
refactor: bring in pipeline 7d2402701877901871dd1294a65ac937605a6a93
shuiyisong Jun 4, 2024
e2a2e50
refactor: move `pipeline_operator` to `pipeline` crate
shuiyisong Jun 4, 2024
209a1a3
chore: minor update
shuiyisong Jun 4, 2024
c110adb
refactor: bring in pipeline 1711f4d46687bada72426d88cda417899e0ae3a4
shuiyisong Jun 5, 2024
1047dd7
chore: add log
shuiyisong Jun 5, 2024
2ff2fda
chore: add log
shuiyisong Jun 5, 2024
8b6a652
chore: remove open hook
shuiyisong Jun 5, 2024
6ca15ad
Merge pull request #8 from shuiyisong/refactor/log
paomian Jun 5, 2024
1298b0a
chore: minor update
shuiyisong Jun 5, 2024
ea548b0
chore: fix fmt
shuiyisong Jun 5, 2024
fb13278
Merge pull request #9 from shuiyisong/refactor/log
paomian Jun 5, 2024
6c88b89
chore: minor update
shuiyisong Jun 5, 2024
eeed85e
chore: rename desc for pipeline table
shuiyisong Jun 5, 2024
f77d20b
refactor: remove updated_at in pipelines
shuiyisong Jun 5, 2024
38ed6bb
Merge pull request #10 from shuiyisong/chore/polish_code
paomian Jun 5, 2024
5815675
chore: add more content type support for log inserter api
paomian Jun 5, 2024
c84ef0e
Merge pull request #11 from paomian/feat/log-handler-v2
paomian Jun 5, 2024
2e69655
chore: introduce pipeline crate
shuiyisong Jun 5, 2024
ca9525d
Merge branch 'chore/introduce_pipeline' into feat/log-handler
shuiyisong Jun 5, 2024
85a4c32
Merge branch 'main' into feat/log-handler
shuiyisong Jun 6, 2024
77ef015
chore: update upload pipeline api
paomian Jun 6, 2024
43a57a7
chore: fix by pr commit
paomian Jun 6, 2024
3560285
chore: add some doc for pub fn/struct
paomian Jun 6, 2024
4872c8a
chore: some minro fix
paomian Jun 6, 2024
11933b0
chore: add pipeline version support
paomian Jun 6, 2024
92a2bda
chore: impl log pipeline version
paomian Jun 7, 2024
29eb2db
transform on_failure
yuanbohan Jun 8, 2024
3f8b9ce
chore: merge main
shuiyisong Jun 11, 2024
e764564
chore: merge main
shuiyisong Jun 12, 2024
6bec090
chore: merge log-handler
shuiyisong Jun 12, 2024
4236f87
chore: add test
shuiyisong Jun 12, 2024
cbb2337
chore: move test to a separate file
shuiyisong Jun 12, 2024
6968150
chore: merge log-handler
shuiyisong Jun 17, 2024
83e59b1
chore: add comment
shuiyisong Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions src/pipeline/src/etl/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const TRANSFORM_FIELDS: &str = "fields";
const TRANSFORM_TYPE: &str = "type";
const TRANSFORM_INDEX: &str = "index";
const TRANSFORM_DEFAULT: &str = "default";
const TRANSFORM_ON_FAILURE: &str = "on_failure";

pub use transformer::greptime::GreptimeTransformer;
// pub use transformer::noop::NoopTransformer;
Expand All @@ -38,6 +39,38 @@ pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static {
fn transform(&self, val: crate::etl::value::Value) -> Result<Self::Output, String>;
}

/// On Failure behavior when transform fails
#[derive(Debug, Clone, Default)]
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
pub enum OnFailure {
// Return None if transform fails
#[default]
Ignore,
// Return default value of the field if transform fails
// Default value depends on the type of the field, or explicitly set by user
Default,
}

impl std::str::FromStr for OnFailure {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
"ignore" => Ok(OnFailure::Ignore),
"default" => Ok(OnFailure::Default),
_ => Err(format!("invalid transform on_failure value: {}", s)),
}
}
}

impl std::fmt::Display for OnFailure {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
OnFailure::Ignore => write!(f, "ignore"),
OnFailure::Default => write!(f, "default"),
}
}
}

#[derive(Debug, Default, Clone)]
pub struct Transforms {
transforms: Vec<Transform>,
Expand Down Expand Up @@ -97,6 +130,8 @@ pub struct Transform {
pub default: Option<Value>,

pub index: Option<Index>,

pub on_failure: Option<OnFailure>,
}

impl std::fmt::Display for Transform {
Expand All @@ -107,10 +142,21 @@ impl std::fmt::Display for Transform {
"".to_string()
};

let fields = format!("field(s): {}", self.fields);
let type_ = format!("type: {}", self.type_);
let fields = format!("field(s): {}", self.fields);
let default = if let Some(default) = &self.default {
format!(", default: {}", default)
} else {
"".to_string()
};

let on_failure = if let Some(on_failure) = &self.on_failure {
format!(", on_failure: {}", on_failure)
} else {
"".to_string()
};

write!(f, "{type_}{index}, {fields}")
write!(f, "{type_}{index}, {fields}{default}{on_failure}",)
}
}

Expand All @@ -121,6 +167,7 @@ impl Default for Transform {
type_: Value::Null,
default: None,
index: None,
on_failure: None,
}
}
}
Expand Down Expand Up @@ -155,9 +202,17 @@ impl Transform {
self.index = Some(index);
}

fn with_on_failure(&mut self, on_failure: OnFailure) {
self.on_failure = Some(on_failure);
}

pub(crate) fn get_default(&self) -> Option<&Value> {
self.default.as_ref()
}

pub(crate) fn get_type_matched_default_val(&self) -> &Value {
&self.type_
}
}

impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
Expand Down Expand Up @@ -192,6 +247,12 @@ impl TryFrom<&yaml_rust::yaml::Hash> for Transform {
TRANSFORM_DEFAULT => {
default_opt = Some(Value::try_from(v)?);
}

TRANSFORM_ON_FAILURE => {
let on_failure = yaml_string(v, TRANSFORM_ON_FAILURE)?;
transform.with_on_failure(on_failure.parse()?);
}

_ => {}
}
}
Expand Down
230 changes: 203 additions & 27 deletions src/pipeline/src/etl/transform/transformer/greptime/coerce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};

use crate::etl::transform::index::Index;
use crate::etl::transform::Transform;
use crate::etl::transform::{OnFailure, Transform};
use crate::etl::value::{Epoch, Time, Value};

impl TryFrom<Value> for ValueData {
Expand Down Expand Up @@ -177,8 +177,20 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result<Option<ValueData>
Value::Boolean(_) => ValueData::BoolValue(b),
Value::String(_) => ValueData::StringValue(b.to_string()),

Value::Time(_) => return Err("Boolean type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Boolean type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Boolean type not supported for Time".to_string()),
},
Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Boolean type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand Down Expand Up @@ -207,8 +219,21 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),

Value::Time(_) => return Err("Integer type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Integer type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Integer type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand Down Expand Up @@ -237,8 +262,21 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0),
Value::String(_) => ValueData::StringValue(n.to_string()),

Value::Time(_) => return Err("Integer type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Integer type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Integer type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Integer type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand Down Expand Up @@ -267,8 +305,21 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>,
Value::Boolean(_) => ValueData::BoolValue(n != 0.0),
Value::String(_) => ValueData::StringValue(n.to_string()),

Value::Time(_) => return Err("Float type not supported for Time".to_string()),
Value::Epoch(_) => return Err("Float type not supported for Epoch".to_string()),
Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Time".to_string())
}
None => return Err("Float type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => return Ok(None),
Some(OnFailure::Default) => {
return Err("default value not supported for Epoch".to_string())
}
None => return Err("Float type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),
Expand All @@ -280,31 +331,156 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result<Option<ValueData>,
}

fn coerce_string_value(s: &str, transform: &Transform) -> Result<Option<ValueData>, String> {
let val = match transform.type_ {
Value::Int8(_) => ValueData::I8Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int16(_) => ValueData::I16Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int32(_) => ValueData::I32Value(s.parse::<i32>().map_err(|e| e.to_string())?),
Value::Int64(_) => ValueData::I64Value(s.parse::<i64>().map_err(|e| e.to_string())?),
match transform.type_ {
Value::Int8(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I8Value(s.parse().unwrap())))
}
Value::Int16(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I16Value(s.parse().unwrap())))
}
Value::Int32(_) if s.parse::<i32>().is_ok() => {
Ok(Some(ValueData::I32Value(s.parse().unwrap())))
}
Value::Int64(_) if s.parse::<i64>().is_ok() => {
Ok(Some(ValueData::I64Value(s.parse().unwrap())))
}

Value::Uint8(_) => ValueData::U8Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint16(_) => ValueData::U16Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint32(_) => ValueData::U32Value(s.parse::<u32>().map_err(|e| e.to_string())?),
Value::Uint64(_) => ValueData::U64Value(s.parse::<u64>().map_err(|e| e.to_string())?),
Value::Uint8(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U8Value(s.parse().unwrap())))
}
Value::Uint16(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U16Value(s.parse().unwrap())))
}
Value::Uint32(_) if s.parse::<u32>().is_ok() => {
Ok(Some(ValueData::U32Value(s.parse().unwrap())))
}
Value::Uint64(_) if s.parse::<u64>().is_ok() => {
Ok(Some(ValueData::U64Value(s.parse().unwrap())))
}

Value::Float32(_) => ValueData::F32Value(s.parse::<f32>().map_err(|e| e.to_string())?),
Value::Float64(_) => ValueData::F64Value(s.parse::<f64>().map_err(|e| e.to_string())?),
Value::Float32(_) if s.parse::<f32>().is_ok() => {
Ok(Some(ValueData::F32Value(s.parse().unwrap())))
}
Value::Float64(_) if s.parse::<f64>().is_ok() => {
Ok(Some(ValueData::F64Value(s.parse().unwrap())))
}

Value::Boolean(_) => ValueData::BoolValue(s.parse::<bool>().map_err(|e| e.to_string())?),
Value::String(_) => ValueData::StringValue(s.to_string()),
Value::Boolean(_) if s.parse::<bool>().is_ok() => {
Ok(Some(ValueData::BoolValue(s.parse().unwrap())))
}

Value::Time(_) => return Err("String type not supported for Time".to_string()),
Value::Epoch(_) => return Err("String type not supported for Epoch".to_string()),
// on_failure
Value::Int8(_)
| Value::Int16(_)
| Value::Int32(_)
| Value::Int64(_)
| Value::Uint8(_)
| Value::Uint16(_)
| Value::Uint32(_)
| Value::Uint64(_)
| Value::Float32(_)
| Value::Float64(_)
| Value::Boolean(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => match transform.get_default() {
Some(default) => coerce_value(default, transform),
None => coerce_value(transform.get_type_matched_default_val(), transform),
},
None => Err(format!(
"failed to coerce string value '{s}' to type '{}'",
transform.type_.to_str_type()
)),
},

Value::String(_) => Ok(Some(ValueData::StringValue(s.to_string()))),

Value::Time(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => Err("default value not supported for Time".to_string()),
None => Err("String type not supported for Time".to_string()),
},

Value::Epoch(_) => match transform.on_failure {
Some(OnFailure::Ignore) => Ok(None),
Some(OnFailure::Default) => Err("default value not supported for Epoch".to_string()),
None => Err("String type not supported for Epoch".to_string()),
},

Value::Array(_) => unimplemented!("Array type not supported"),
Value::Map(_) => unimplemented!("Object type not supported"),

Value::Null => return Ok(None),
};
Value::Null => Ok(None),
}
}

Ok(Some(val))
#[cfg(test)]
mod tests {
use super::*;
use crate::etl::field::Fields;

#[test]
fn test_coerce_string_without_on_failure() {
let transform = Transform {
fields: Fields::default(),
type_: Value::Int32(0),
default: None,
index: None,
on_failure: None,
};

// valid string
{
let val = Value::String("123".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, Some(ValueData::I32Value(123)));
}

// invalid string
{
let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform);
assert!(result.is_err());
}
}

#[test]
fn test_coerce_string_with_on_failure_ignore() {
let transform = Transform {
fields: Fields::default(),
type_: Value::Int32(0),
default: None,
index: None,
on_failure: Some(OnFailure::Ignore),
};

let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, None);
}

#[test]
fn test_coerce_string_with_on_failure_default() {
let mut transform = Transform {
fields: Fields::default(),
type_: Value::Int32(0),
default: None,
index: None,
on_failure: Some(OnFailure::Default),
};

// with no explicit default value
{
let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, Some(ValueData::I32Value(0)));
}

// with explicit default value
{
transform.default = Some(Value::Int32(42));
let val = Value::String("hello".to_string());
let result = coerce_value(&val, &transform).unwrap();
assert_eq!(result, Some(ValueData::I32Value(42)));
}
}
}
Loading