Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): infer table schema correctly #4113

Merged
merged 9 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl FlownodeManager {
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
// TODO(discord9): instead of auto build table from request schema, actually build table
// before `create flow` to be able to assign pk and ts etc.
let (primary_keys, schema, is_auto_create) = if let Some(table_id) = self
let (primary_keys, schema, is_ts_placeholder) = if let Some(table_id) = self
.table_info_source
.get_table_id_from_name(&table_name)
.await?
Expand Down Expand Up @@ -317,7 +317,12 @@ impl FlownodeManager {
.map(|v| {
v.column_indices
.iter()
.map(|i| format!("Col_{i}"))
.map(|i| {
schema
.get_name(*i)
.clone()
.unwrap_or_else(|| format!("Col_{i}"))
})
.collect_vec()
})
.unwrap_or_default();
Expand All @@ -326,15 +331,8 @@ impl FlownodeManager {
ConcreteDataType::timestamp_millisecond_datatype(),
true,
);
// TODO(discord9): bugged so we can't infer time index from flow plan, so we have to manually set one
let ts_col = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);

let wout_ts = schema
let original_schema = schema
.typ()
.column_types
.clone()
Expand All @@ -345,16 +343,33 @@ impl FlownodeManager {
.names
.get(idx)
.cloned()
.flatten()
.unwrap_or(format!("Col_{}", idx));
ColumnSchema::new(name, typ.scalar_type, typ.nullable)
let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
if schema.typ().time_index == Some(idx) {
ret.with_time_index(true)
} else {
ret
}
})
.collect_vec();

let mut with_ts = wout_ts.clone();
with_ts.push(update_at);
with_ts.push(ts_col);
let mut with_auto_added_col = original_schema.clone();
with_auto_added_col.push(update_at);

// if no time index, add one as placeholder
let no_time_index = schema.typ().time_index.is_none();
if no_time_index {
let ts_col = ColumnSchema::new(
AUTO_CREATED_PLACEHOLDER_TS_COL,
ConcreteDataType::timestamp_millisecond_datatype(),
true,
)
.with_time_index(true);
with_auto_added_col.push(ts_col);
}

(primary_keys, with_ts, true)
(primary_keys, with_auto_added_col, no_time_index)
};
let schema_len = schema.len();
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
Expand All @@ -377,7 +392,7 @@ impl FlownodeManager {
now,
))]);
// ts col, if auto create
if is_auto_create {
if is_ts_placeholder {
ensure!(
row.len() == schema_len - 1,
InternalSnafu {
Expand Down Expand Up @@ -508,12 +523,13 @@ impl FlownodeManager {
debug!("Starting to run");
loop {
// TODO(discord9): only run when new inputs arrive or scheduled to
debug!("call run_available in run every second");
self.run_available(true).await.unwrap();
debug!("call send_writeback_requests in run every second");
if let Err(err) = self.run_available(true).await {
common_telemetry::error!(err;"Run available errors");
}
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
debug!("call log_all_errors in run every second");
if let Err(err) = self.send_writeback_requests().await {
common_telemetry::error!(err;"Send writeback request errors");
};
self.log_all_errors().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand Down Expand Up @@ -596,6 +612,7 @@ impl FlownodeManager {
break;
}
}
self.node_context.write().await.remove_flow(flow_id);
Ok(())
}

Expand Down Expand Up @@ -642,8 +659,9 @@ impl FlownodeManager {
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;

debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
node_ctx.assign_table_schema(&sink_table_name, flow_plan.schema.clone())?;

let _ = comment;
let _ = flow_options;
Expand Down
11 changes: 11 additions & 0 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::adapter::error::InternalSnafu;
use crate::adapter::FlownodeManager;
use crate::repr::{self, DiffRow};

Expand Down Expand Up @@ -126,6 +127,16 @@ impl Flownode for FlownodeManager {
.context(UnexpectedSnafu {
err_msg: format!("Table not found: {}", table_id),
})?;
let table_col_names = table_col_names
.iter().enumerate()
.map(|(idx,name)| match name {
Some(name) => Ok(name.clone()),
None => InternalSnafu {
reason: format!("Expect column {idx} of table id={table_id} to have name in table schema, found None"),
}
.fail().map_err(BoxedError::new).context(ExternalSnafu),
})
.collect::<Result<Vec<_>>>()?;
let name_to_col = HashMap::<_, _>::from_iter(
insert_schema
.iter()
Expand Down
23 changes: 19 additions & 4 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP};
use crate::repr::{DiffRow, RelationDesc, BROADCAST_CAP};

/// A context that holds the information of the dataflow
#[derive(Default, Debug)]
Expand All @@ -36,6 +36,7 @@ pub struct FlownodeContext {
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub flow_to_sink: BTreeMap<FlowId, TableName>,
pub sink_to_flow: BTreeMap<TableName, FlowId>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
Expand Down Expand Up @@ -184,7 +185,21 @@ impl FlownodeContext {
}

self.add_sink_receiver(sink_table_name.clone());
self.flow_to_sink.insert(task_id, sink_table_name);
self.flow_to_sink.insert(task_id, sink_table_name.clone());
self.sink_to_flow.insert(sink_table_name, task_id);
}

/// remove flow from worker context
pub fn remove_flow(&mut self, task_id: FlowId) {
if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) {
self.sink_to_flow.remove(&sink_table_name);
}
for (source_table_id, tasks) in self.source_to_tasks.iter_mut() {
tasks.remove(&task_id);
if tasks.is_empty() {
self.source_sender.remove(source_table_id);
}
}
}

/// try add source sender, if already exist, do nothing
Expand Down Expand Up @@ -307,14 +322,14 @@ impl FlownodeContext {
pub fn assign_table_schema(
&mut self,
table_name: &TableName,
schema: RelationType,
schema: RelationDesc,
) -> Result<(), Error> {
let gid = self
.table_repr
.get_by_name(table_name)
.map(|(_, gid)| gid)
.unwrap();
self.schema.insert(gid, schema.into_unnamed());
self.schema.insert(gid, schema);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/adapter/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl TableSource {
nullable: col.is_nullable(),
scalar_type: col.data_type,
},
col.name,
Some(col.name),
)
})
.unzip();
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ mod test {
plan: Plan::Get {
id: Id::Global(GlobalId::User(1)),
},
typ: RelationType::new(vec![]),
schema: RelationType::new(vec![]).into_unnamed(),
},
);
let create_reqs = Request::Create {
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/compute/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
input,
key_val_plan,
reduce_plan,
} => self.render_reduce(input, key_val_plan, reduce_plan, plan.typ),
} => self.render_reduce(input, key_val_plan, reduce_plan, plan.schema.typ),
Plan::Join { .. } => NotImplementedSnafu {
reason: "Join is still WIP",
}
Expand Down
6 changes: 3 additions & 3 deletions src/flow/src/compute/render/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ mod test {
.unwrap();

let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
.unwrap();
let output = get_output_handle(&mut ctx, bundle);
// drop ctx here to simulate actual process of compile first, run later scenario
Expand Down Expand Up @@ -312,7 +312,7 @@ mod test {
)])
.unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
Expand Down Expand Up @@ -348,7 +348,7 @@ mod test {
)])
.unwrap();
let bundle = ctx
.render_mfp(Box::new(input_plan.with_types(typ)), mfp)
.render_mfp(Box::new(input_plan.with_types(typ.into_unnamed())), mfp)
.unwrap();

let output = get_output_handle(&mut ctx, bundle);
Expand Down
53 changes: 33 additions & 20 deletions src/flow/src/compute/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,11 +813,12 @@ mod test {
distinct: false,
};
let expected = TypedPlan {
typ: RelationType::new(vec![
schema: RelationType::new(vec![
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
]),
])
.into_unnamed(),
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
Expand All @@ -829,10 +830,13 @@ mod test {
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])
.into_unnamed(),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
Expand Down Expand Up @@ -880,7 +884,8 @@ mod test {
ColumnType::new(CDT::uint64_datatype(), true), //sum(number)
])
.with_key(vec![1])
.with_time_index(Some(0)),
.with_time_index(Some(0))
.into_unnamed(),
),
),
mfp: MapFilterProject::new(3)
Expand Down Expand Up @@ -977,17 +982,22 @@ mod test {
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
typ: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)]),
schema: RelationType::new(vec![ColumnType::new(CDT::uint64_datatype(), true)])
.into_unnamed(),
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::int64_datatype(), false),
])),
.with_types(
RelationType::new(vec![ColumnType::new(
ConcreteDataType::int64_datatype(),
false,
)])
.into_unnamed(),
),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(1)
Expand All @@ -1008,10 +1018,13 @@ mod test {
distinct_aggrs: vec![],
}),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), true),
ColumnType::new(ConcreteDataType::int64_datatype(), true),
])),
.with_types(
RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), true),
ColumnType::new(ConcreteDataType::int64_datatype(), true),
])
.into_unnamed(),
),
),
mfp: MapFilterProject::new(2)
.map(vec![
Expand Down Expand Up @@ -1068,7 +1081,7 @@ mod test {
let reduce_plan = ReducePlan::Distinct;
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
Expand Down Expand Up @@ -1143,7 +1156,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
Expand Down Expand Up @@ -1224,7 +1237,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
Expand Down Expand Up @@ -1301,7 +1314,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
Expand Down Expand Up @@ -1393,7 +1406,7 @@ mod test {
let reduce_plan = ReducePlan::Accumulable(accum_plan);
let bundle = ctx
.render_reduce(
Box::new(input_plan.with_types(typ)),
Box::new(input_plan.with_types(typ.into_unnamed())),
key_val_plan,
reduce_plan,
RelationType::empty(),
Expand Down
Loading