Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delete pipeline #4156

Merged
merged 16 commits into from
Jul 5, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::table::{PipelineInfo, PipelineVersion};
use pipeline::{GreptimeTransformer, Pipeline};
use servers::error::{
AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
UnsupportedDeletePipelineSnafu,
};
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult};
use servers::query_handler::LogHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -72,9 +68,16 @@ impl LogHandler for Instance {
.context(PipelineSnafu)
}

async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> {
// TODO(qtang): impl delete
Err(UnsupportedDeletePipelineSnafu {}.build())
async fn delete_pipeline(
&self,
name: &str,
version: PipelineVersion,
ctx: QueryContextRef,
) -> ServerResult<Option<()>> {
self.pipeline_operator
.delete_pipeline(name, version, ctx)
.await
.context(PipelineSnafu)
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::event::LogValidatorRef;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
Expand Down Expand Up @@ -89,7 +90,8 @@ where
Some(self.instance.clone()),
);

builder = builder.with_log_ingest_handler(self.instance.clone());
builder = builder
.with_log_ingest_handler(self.instance.clone(), self.plugins.get::<LogValidatorRef>());

if let Some(user_provider) = self.plugins.get::<UserProviderRef>() {
builder = builder.with_user_provider(user_provider);
Expand Down
1 change: 1 addition & 0 deletions src/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ common-telemetry.workspace = true
common-time.workspace = true
crossbeam-utils.workspace = true
csv = "1.3.0"
dashmap.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
Expand Down
File renamed without changes.
File renamed without changes.
6 changes: 5 additions & 1 deletion src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

mod etl;
mod manager;
mod metrics;

pub use etl::transform::GreptimeTransformer;
pub use etl::value::Value;
pub use etl::{parse, Content, Pipeline};
pub use manager::{error, pipeline_operator, table};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
PipelineVersion,
};
38 changes: 38 additions & 0 deletions src/pipeline/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http:https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_time::Timestamp;
use datatypes::timestamp::TimestampNanosecond;

use crate::table::PipelineTable;
use crate::{GreptimeTransformer, Pipeline};

pub mod error;
pub mod pipeline_operator;
pub mod table;
pub mod util;

/// Pipeline version. An optional timestamp with nanosecond precision.
/// If the version is None, it means the latest version of the pipeline.
/// User can specify the version by providing a timestamp string formatted as iso8601.
/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch.
pub type PipelineVersion = Option<TimestampNanosecond>;

/// Pipeline info. A tuple of timestamp and pipeline reference.
pub type PipelineInfo = (Timestamp, PipelineRef);

pub type PipelineTableRef = Arc<PipelineTable>;
pub type PipelineRef = Arc<Pipeline<GreptimeTransformer>>;
14 changes: 11 additions & 3 deletions src/pipeline/src/manager/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid pipeline version format: {}", version))]
InvalidPipelineVersion {
version: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -113,9 +120,10 @@ impl ErrorExt for Error {
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
InsertPipeline { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
PipelineNotFound { .. } | CompilePipeline { .. } | PipelineTransform { .. } => {
StatusCode::InvalidArguments
}
PipelineNotFound { .. }
| CompilePipeline { .. }
| PipelineTransform { .. }
| InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
Expand Down
17 changes: 0 additions & 17 deletions src/pipeline/src/manager/mod.rs

This file was deleted.

67 changes: 48 additions & 19 deletions src/pipeline/src/manager/pipeline_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Instant;

use api::v1::CreateTableExpr;
use catalog::{CatalogManagerRef, RegisterSystemTableRequest};
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_telemetry::info;
use futures::FutureExt;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::QueryEngineRef;
Expand All @@ -27,11 +29,14 @@ use snafu::{OptionExt, ResultExt};
use table::TableRef;

use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
use crate::table::{PipelineInfo, PipelineTable, PipelineTableRef, PipelineVersion};
use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion};
use crate::metrics::{
METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM,
METRIC_PIPELINE_RETRIEVE_HISTOGRAM,
};
use crate::table::{PipelineTable, PIPELINE_TABLE_NAME};
use crate::{GreptimeTransformer, Pipeline};

pub const PIPELINE_TABLE_NAME: &str = "pipelines";

/// PipelineOperator is responsible for managing pipelines.
/// It provides the ability to:
/// - Create a pipeline table if it does not exist
Expand All @@ -50,7 +55,7 @@ pub struct PipelineOperator {

impl PipelineOperator {
/// Create a table request for the pipeline table.
pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest {
let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema();

let create_table_expr = CreateTableExpr {
Expand Down Expand Up @@ -146,20 +151,6 @@ impl PipelineOperator {
pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option<PipelineTableRef> {
self.tables.read().unwrap().get(catalog).cloned()
}

async fn insert_and_compile(
&self,
ctx: QueryContextRef,
name: &str,
content_type: &str,
pipeline: &str,
) -> Result<PipelineInfo> {
let schema = ctx.current_schema();
self.get_pipeline_table_from_cache(ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(&schema, name, content_type, pipeline)
.await
}
}

impl PipelineOperator {
Expand Down Expand Up @@ -189,9 +180,16 @@ impl PipelineOperator {
let schema = query_ctx.current_schema();
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;

let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.get_pipeline(&schema, name, version)
.inspect(|re| {
METRIC_PIPELINE_RETRIEVE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
.await
}

Expand All @@ -206,7 +204,38 @@ impl PipelineOperator {
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;

self.insert_and_compile(query_ctx, name, content_type, pipeline)
let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline)
.inspect(|re| {
METRIC_PIPELINE_CREATE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
.await
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
}

/// Delete a pipeline by name from pipeline table.
pub async fn delete_pipeline(
&self,
name: &str,
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>> {
// trigger load pipeline table
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;

let timer = Instant::now();
self.get_pipeline_table_from_cache(query_ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.delete_pipeline(&query_ctx.current_schema(), name, version)
.inspect(|re| {
METRIC_PIPELINE_DELETE_HISTOGRAM
.with_label_values(&[&re.is_ok().to_string()])
.observe(timer.elapsed().as_secs_f64())
})
shuiyisong marked this conversation as resolved.
Show resolved Hide resolved
.await
}
}
Loading