Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update physical table schema on alter logical tables #3585

Merged
12 changes: 7 additions & 5 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use catalog::kvbackend::CachedMetaKvBackendBuilder;
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager};
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand Down Expand Up @@ -248,11 +248,12 @@ impl StartCommand {
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);

let catalog_manager =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone());

let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
cached_meta_backend.clone(),
)),
Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())),
]);

let heartbeat_task = HeartbeatTask::new(
Expand All @@ -263,10 +264,11 @@ impl StartCommand {

let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
catalog_manager.clone(),
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(cached_meta_backend)
.with_cache_invalidator(catalog_manager.clone())
evenyag marked this conversation as resolved.
Show resolved Hide resolved
.with_plugin(plugins.clone())
.with_heartbeat_task(heartbeat_task)
.try_build()
Expand Down
26 changes: 19 additions & 7 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use std::sync::Arc;
use std::{fs, path};

use async_trait::async_trait;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::ProcedureExecutorRef;
Expand Down Expand Up @@ -399,6 +400,9 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

let catalog_manager =
KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator));

let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
Expand Down Expand Up @@ -429,15 +433,22 @@ impl StartCommand {
table_metadata_manager,
procedure_manager.clone(),
datanode_manager.clone(),
catalog_manager.clone(),
table_meta_allocator,
)
.await?;

let mut frontend = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(fe_plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
let mut frontend = FrontendBuilder::new(
kv_backend,
catalog_manager.clone(),
datanode_manager,
ddl_task_executor,
)
.with_plugin(fe_plugins.clone())
.with_cache_invalidator(catalog_manager)
.try_build()
.await
.context(StartFrontendSnafu)?;

let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins)
.build()
Expand All @@ -459,13 +470,14 @@ impl StartCommand {
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
Arc::new(DummyCacheInvalidator),
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
Arc::new(MemoryRegionKeeper::default()),
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_table;
mod physical_table_metadata;
pub mod table_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
Expand Down
116 changes: 86 additions & 30 deletions src/common/meta/src/ddl/alter_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@
mod check;
mod metadata;
mod region_request;
mod table_cache_keys;
mod update_metadata;

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context, LockKey, Procedure, Status};
use common_telemetry::{info, warn};
use futures_util::future;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
use strum::AsRefStr;
use table::metadata::TableId;

use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{Error, Result};
use crate::instruction::CacheIdent;
use crate::ddl::{physical_table_metadata, DdlContext};
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
use crate::key::table_info::TableInfoValue;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::key::DeserializedValueWithBytes;
use crate::lock_key::{CatalogLock, SchemaLock, TableLock};
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders};
use crate::{cache_invalidator, metrics, ClusterId};
Expand All @@ -60,8 +64,9 @@ impl AlterLogicalTablesProcedure {
tasks,
table_info_values: vec![],
physical_table_id,
physical_table_info: None,
physical_table_route: None,
cache_invalidate_keys: vec![],
physical_columns: vec![],
},
}
}
Expand All @@ -79,11 +84,24 @@ impl AlterLogicalTablesProcedure {
// Checks the physical table, must after [fill_table_info_values]
self.check_physical_table().await?;
// Fills the physical table info
self.fill_physical_table_route().await?;
// Filter the tasks
self.fill_physical_table_info().await?;
// Filter the finished tasks
let finished_tasks = self.check_finished_tasks()?;
if finished_tasks.iter().all(|x| *x) {
return Ok(Status::done());
let already_finished_count = finished_tasks
.iter()
.map(|x| if *x { 1 } else { 0 })
.sum::<usize>();
let apply_tasks_count = self.data.tasks.len();
if already_finished_count == apply_tasks_count {
info!("All the alter tasks are finished, will skip the procedure.");
// Re-invalidate the table cache
self.data.state = AlterTablesState::InvalidateTableCache;
return Ok(Status::executing(true));
} else if already_finished_count > 0 {
info!(
"There are {} alter tasks, {} of them were already finished.",
apply_tasks_count, already_finished_count
);
}
self.filter_task(&finished_tasks)?;

Expand Down Expand Up @@ -116,17 +134,61 @@ impl AlterLogicalTablesProcedure {
}
}

future::join_all(alter_region_tasks)
// Collects responses from all the alter region tasks.
let phy_raw_schemas = future::join_all(alter_region_tasks)
.await
.into_iter()
.map(|res| res.map(|mut res| res.extension.remove(ALTER_PHYSICAL_EXTENSION_KEY)))
.collect::<Result<Vec<_>>>()?;

self.data.state = AlterTablesState::UpdateMetadata;
if phy_raw_schemas.is_empty() {
self.data.state = AlterTablesState::UpdateMetadata;
return Ok(Status::executing(true));
}

// Verify all the physical schemas are the same
// Safety: previous check ensures this vec is not empty
let first = phy_raw_schemas.first().unwrap();
ensure!(
phy_raw_schemas.iter().all(|x| x == first),
MetadataCorruptionSnafu {
err_msg: "The physical schemas from datanodes are not the same."
}
);

// Decodes the physical raw schemas
if let Some(phy_raw_schema) = first {
self.data.physical_columns =
ColumnMetadata::decode_list(phy_raw_schema).context(DecodeJsonSnafu)?;
} else {
warn!("altering logical table result doesn't contains extension key `{ALTER_PHYSICAL_EXTENSION_KEY}`,leaving the physical table's schema unchanged");
}

self.data.state = AlterTablesState::UpdateMetadata;
Ok(Status::executing(true))
}

pub(crate) async fn on_update_metadata(&mut self) -> Result<Status> {
if !self.data.physical_columns.is_empty() {
let physical_table_info = self.data.physical_table_info.as_ref().unwrap();

// Generates new table info
let old_raw_table_info = physical_table_info.table_info.clone();
let new_raw_table_info = physical_table_metadata::build_new_physical_table_info(
old_raw_table_info,
&self.data.physical_columns,
);

// Updates physical table's metadata
self.context
.table_metadata_manager
.update_table_info(
DeserializedValueWithBytes::from_inner(physical_table_info.clone()),
new_raw_table_info,
)
.await?;
evenyag marked this conversation as resolved.
Show resolved Hide resolved
}

let table_info_values = self.build_update_metadata()?;
let manager = &self.context.table_metadata_manager;
let chunk_size = manager.batch_update_table_info_value_chunk_size();
Expand All @@ -151,15 +213,12 @@ impl AlterLogicalTablesProcedure {
}

pub(crate) async fn on_invalidate_table_cache(&mut self) -> Result<Status> {
let to_invalidate = self
.data
.cache_invalidate_keys
.drain(..)
.map(CacheIdent::TableId)
.collect::<Vec<_>>();
let ctx = cache_invalidator::Context::default();
let to_invalidate = self.build_table_cache_keys_to_invalidate();

self.context
.cache_invalidator
.invalidate(&cache_invalidator::Context::default(), to_invalidate)
.invalidate(&ctx, to_invalidate)
.await?;
Ok(Status::done())
}
Expand Down Expand Up @@ -212,17 +271,13 @@ impl Procedure for AlterLogicalTablesProcedure {
lock_key.push(CatalogLock::Read(table_ref.catalog).into());
lock_key.push(SchemaLock::read(table_ref.catalog, table_ref.schema).into());
lock_key.push(TableLock::Write(self.data.physical_table_id).into());
lock_key.extend(
self.data
.table_info_values
.iter()
.map(|table| TableLock::Write(table.table_info.ident.table_id).into()),
);

for task in &self.data.tasks {
lock_key.push(
TableNameLock::new(
&task.alter_table.catalog_name,
&task.alter_table.schema_name,
&task.alter_table.table_name,
)
.into(),
);
}
LockKey::new(lock_key)
}
}
Expand All @@ -237,8 +292,9 @@ pub struct AlterTablesData {
table_info_values: Vec<TableInfoValue>,
/// Physical table info
physical_table_id: TableId,
physical_table_info: Option<TableInfoValue>,
physical_table_route: Option<PhysicalTableRouteValue>,
cache_invalidate_keys: Vec<TableId>,
physical_columns: Vec<ColumnMetadata>,
}

#[derive(Debug, Serialize, Deserialize, AsRefStr)]
Expand Down
45 changes: 33 additions & 12 deletions src/common/meta/src/ddl/alter_logical_tables/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ use snafu::OptionExt;
use table::metadata::TableId;

use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::error::{Result, TableInfoNotFoundSnafu, TableNotFoundSnafu};
use crate::error::{
AlterLogicalTablesInvalidArgumentsSnafu, Result, TableInfoNotFoundSnafu, TableNotFoundSnafu,
TableRouteNotFoundSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::AlterTableTask;

impl AlterLogicalTablesProcedure {
Expand All @@ -46,21 +50,38 @@ impl AlterLogicalTablesProcedure {
}
})
.collect();
self.data.cache_invalidate_keys = self
.data
.table_info_values
.iter()
.map(|table| table.table_info.ident.table_id)
.collect();

Ok(())
}

pub(crate) async fn fill_physical_table_route(&mut self) -> Result<()> {
let table_route_manager = self.context.table_metadata_manager.table_route_manager();
let (_, physical_table_route) = table_route_manager
.get_physical_table_route(self.data.physical_table_id)
pub(crate) async fn fill_physical_table_info(&mut self) -> Result<()> {
let (physical_table_info, physical_table_route) = self
.context
.table_metadata_manager
.get_full_table_info(self.data.physical_table_id)
.await?;

let physical_table_info = physical_table_info
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id - {}", self.data.physical_table_id),
})?
.into_inner();
let physical_table_route = physical_table_route
.context(TableRouteNotFoundSnafu {
table_id: self.data.physical_table_id,
})?
.into_inner();

self.data.physical_table_info = Some(physical_table_info);
let TableRouteValue::Physical(physical_table_route) = physical_table_route else {
return AlterLogicalTablesInvalidArgumentsSnafu {
err_msg: format!(
"expected a physical table but got a logical table: {:?}",
self.data.physical_table_id
),
}
.fail();
};
self.data.physical_table_route = Some(physical_table_route);

Ok(())
Expand All @@ -87,7 +108,7 @@ impl AlterLogicalTablesProcedure {
table_info_map
.remove(table_id)
.with_context(|| TableInfoNotFoundSnafu {
table_name: extract_table_name(task),
table: extract_table_name(task),
})?;
table_info_values.push(table_info_value);
}
Expand Down
Loading