From 0da597c4be5bcdfbe4c4a12e74daf5ee669a7997 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Mon, 17 Jun 2024 14:12:24 +0800 Subject: [PATCH 01/11] feat: add delete for pipeline --- Cargo.lock | 1 + src/frontend/src/instance/log_handler.rs | 13 ++- src/frontend/src/server.rs | 4 +- src/pipeline/Cargo.toml | 1 + src/pipeline/src/{etl/mod.rs => etl.rs} | 0 .../etl/{processor/mod.rs => processor.rs} | 0 .../etl/{transform/mod.rs => transform.rs} | 0 .../{transformer/mod.rs => transformer.rs} | 0 .../{greptime/mod.rs => greptime.rs} | 0 .../src/etl/{value/mod.rs => value.rs} | 0 .../src/{manager/mod.rs => manager.rs} | 0 src/pipeline/src/manager/error.rs | 9 +- src/pipeline/src/manager/pipeline_operator.rs | 16 ++++ src/pipeline/src/manager/table.rs | 95 +++++++++++++++++-- src/servers/src/error.rs | 7 -- src/servers/src/http.rs | 23 ++++- src/servers/src/http/event.rs | 55 ++++++++++- tests-integration/src/test_util.rs | 1 + tests-integration/tests/http.rs | 82 ++++++++++++++++ 19 files changed, 277 insertions(+), 30 deletions(-) rename src/pipeline/src/{etl/mod.rs => etl.rs} (100%) rename src/pipeline/src/etl/{processor/mod.rs => processor.rs} (100%) rename src/pipeline/src/etl/{transform/mod.rs => transform.rs} (100%) rename src/pipeline/src/etl/transform/{transformer/mod.rs => transformer.rs} (100%) rename src/pipeline/src/etl/transform/transformer/{greptime/mod.rs => greptime.rs} (100%) rename src/pipeline/src/etl/{value/mod.rs => value.rs} (100%) rename src/pipeline/src/{manager/mod.rs => manager.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index b10396cb7b53..11a071fc9643 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7305,6 +7305,7 @@ dependencies = [ "common-time", "crossbeam-utils", "csv", + "dashmap", "datafusion 38.0.0", "datafusion-common 38.0.0", "datafusion-expr 38.0.0", diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 6ef48205cc56..fe2b1cc4a6b8 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -21,10 +21,7 @@ use client::Output; use common_error::ext::BoxedError; use pipeline::table::PipelineVersion; use pipeline::{GreptimeTransformer, Pipeline}; -use servers::error::{ - AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, - UnsupportedDeletePipelineSnafu, -}; +use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult}; use servers::query_handler::LogHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -72,9 +69,11 @@ impl LogHandler for Instance { .context(PipelineSnafu) } - async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> { - // TODO(qtang): impl delete - Err(UnsupportedDeletePipelineSnafu {}.build()) + async fn delete_pipeline(&self, name: &str, ctx: QueryContextRef) -> ServerResult<()> { + self.pipeline_operator + .delete_pipeline(name, ctx) + .await + .context(PipelineSnafu) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 1433a595ce81..b670f7db3450 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -22,6 +22,7 @@ use common_runtime::Builder as RuntimeBuilder; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcServer, GrpcServerConfig}; +use servers::http::event::LogValidatorRef; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -90,7 +91,8 @@ where Some(self.instance.clone()), ); - builder = builder.with_log_ingest_handler(self.instance.clone()); + builder = builder + .with_log_ingest_handler(self.instance.clone(), self.plugins.get::()); if let Some(user_provider) = self.plugins.get::() { builder = builder.with_user_provider(user_provider); diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 03096b47a7a1..168471d75635 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -28,6 +28,7 @@ common-telemetry.workspace = true common-time.workspace = true crossbeam-utils.workspace = true csv = "1.3.0" +dashmap.workspace = true datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/pipeline/src/etl/mod.rs b/src/pipeline/src/etl.rs similarity index 100% rename from src/pipeline/src/etl/mod.rs rename to src/pipeline/src/etl.rs diff --git a/src/pipeline/src/etl/processor/mod.rs b/src/pipeline/src/etl/processor.rs similarity index 100% rename from src/pipeline/src/etl/processor/mod.rs rename to src/pipeline/src/etl/processor.rs diff --git a/src/pipeline/src/etl/transform/mod.rs b/src/pipeline/src/etl/transform.rs similarity index 100% rename from src/pipeline/src/etl/transform/mod.rs rename to src/pipeline/src/etl/transform.rs diff --git a/src/pipeline/src/etl/transform/transformer/mod.rs b/src/pipeline/src/etl/transform/transformer.rs similarity index 100% rename from src/pipeline/src/etl/transform/transformer/mod.rs rename to src/pipeline/src/etl/transform/transformer.rs diff --git a/src/pipeline/src/etl/transform/transformer/greptime/mod.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs similarity index 100% rename from src/pipeline/src/etl/transform/transformer/greptime/mod.rs rename to src/pipeline/src/etl/transform/transformer/greptime.rs diff --git a/src/pipeline/src/etl/value/mod.rs b/src/pipeline/src/etl/value.rs similarity index 100% rename from src/pipeline/src/etl/value/mod.rs rename to src/pipeline/src/etl/value.rs diff --git a/src/pipeline/src/manager/mod.rs b/src/pipeline/src/manager.rs similarity index 100% rename from src/pipeline/src/manager/mod.rs rename to src/pipeline/src/manager.rs diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs index ad5d8a96bebd..10c361a59901 100644 --- a/src/pipeline/src/manager/error.rs +++ b/src/pipeline/src/manager/error.rs @@ -24,6 +24,13 @@ use snafu::{Location, Snafu}; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to find column in pipeline table, name: {}", name))] + FindColumnInPipelineTable { + name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Pipeline table not found"))] PipelineTableNotFound { #[snafu(implicit)] @@ -109,7 +116,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - CastType { .. } => StatusCode::Unexpected, + FindColumnInPipelineTable { .. } | CastType { .. } => StatusCode::Unexpected, PipelineTableNotFound { .. } => StatusCode::TableNotFound, InsertPipeline { source, .. } => source.status_code(), CollectRecords { source, .. } => source.status_code(), diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 390a48d834a4..93fff5102e72 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -159,6 +159,13 @@ impl PipelineOperator { .insert_and_compile(ctx.current_schema(), name, content_type, pipeline) .await } + + async fn delete_pipeline_by_name(&self, name: &str, ctx: QueryContextRef) -> Result<()> { + self.get_pipeline_table_from_cache(ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .delete_pipeline_by_name(ctx.current_schema(), name) + .await + } } impl PipelineOperator { @@ -208,4 +215,13 @@ impl PipelineOperator { .await .map(|_| ()) } + + /// Delete a pipeline by name from pipeline table. + pub async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()> { + // trigger load table + self.create_pipeline_table_if_not_exists(query_ctx.clone()) + .await?; + + self.delete_pipeline_by_name(name, query_ctx).await + } } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index d037ae3d4832..7dbde6633138 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -26,8 +26,8 @@ use common_telemetry::{debug, info}; use common_time::timestamp::{TimeUnit, Timestamp}; use datafusion::datasource::DefaultTableSource; use datafusion::logical_expr::{and, col, lit}; -use datafusion_common::TableReference; -use datafusion_expr::LogicalPlanBuilder; +use datafusion_common::{TableReference, ToDFSchema}; +use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlanBuilder}; use datatypes::prelude::ScalarVector; use datatypes::timestamp::TimestampNanosecond; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; @@ -218,6 +218,19 @@ impl PipelineTable { } } + fn remove_pipeline_cache_with_schema_and_name(&self, schema: &str, name: &str) { + let key_prefix = format!("{}/{}", schema, name); + let keys = self + .pipelines + .iter() + .filter(|en| en.0.starts_with(&key_prefix)) + .map(|en| (*en.0).clone()) + .collect::>(); + for key in keys.iter() { + self.pipelines.remove(key); + } + } + fn get_compiled_pipeline_from_cache( &self, schema: &str, @@ -293,7 +306,10 @@ impl PipelineTable { return Ok(pipeline); } - let pipeline = self.find_pipeline_by_name(schema, name, version).await?; + let pipeline = self + .find_pipeline_by_name(schema, name, version) + .await? + .context(PipelineNotFoundSnafu { name, version })?; let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); self.pipelines.insert( @@ -332,12 +348,74 @@ impl PipelineTable { Ok(compiled_pipeline) } + pub async fn delete_pipeline_by_name(&self, schema: &str, name: &str) -> Result<()> { + // 1. check pipeline exist in catalog + let pipeline = self.find_pipeline_by_name(schema, name, None).await?; + if pipeline.is_none() { + return Ok(()); + } + + // 2. do delete + let table_info = self.table.table_info(); + let table_name = TableReference::full( + table_info.catalog_name.clone(), + table_info.schema_name.clone(), + table_info.name.clone(), + ); + let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + + let df_schema = Arc::new( + table_info + .meta + .schema + .arrow_schema() + .clone() + .to_dfschema() + .context(BuildDfLogicalPlanSnafu)?, + ); + + // create scan plan + let logical_plan = LogicalPlanBuilder::scan(table_name.clone(), table_source, None) + .context(BuildDfLogicalPlanSnafu)? + .filter(and( + col("schema").eq(lit(schema)), + col("name").eq(lit(name)), + )) + .context(BuildDfLogicalPlanSnafu)? + .build() + .context(BuildDfLogicalPlanSnafu)?; + + // create dml stmt + let stmt = DmlStatement::new( + table_name, + df_schema, + datafusion_expr::WriteOp::Delete, + Arc::new(logical_plan), + ); + + let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt)); + + let output = self + .query_engine + .execute(plan, Self::query_ctx(&table_info)) + .await + .context(ExecuteInternalStatementSnafu)?; + + // cloud be deleting multiple rows + info!("delete pipeline: {:?}, output: {:?}", name, output); + // remove all caches + self.remove_pipeline_cache_with_schema_and_name(schema, name); + + Ok(()) + } + async fn find_pipeline_by_name( &self, schema: &str, name: &str, version: PipelineVersion, - ) -> Result<(String, TimestampNanosecond)> { + ) -> Result> { let table_info = self.table.table_info(); let table_name = TableReference::full( @@ -396,8 +474,11 @@ impl PipelineTable { .await .context(CollectRecordsSnafu)?; - ensure!(!records.is_empty(), PipelineNotFoundSnafu { name, version }); + if records.is_empty() { + return Ok(None); + } + // limit 1 ensure!( records.len() == 1 && records[0].num_columns() == 2, PipelineNotFoundSnafu { name, version } @@ -436,9 +517,9 @@ impl PipelineTable { ); // Safety: asserted above - Ok(( + Ok(Some(( pipeline_content.get_data(0).unwrap().to_string(), pipeline_created_at.get_data(0).unwrap(), - )) + ))) } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 04b6fa196ca2..80c93444158f 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -156,12 +156,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Unsupported delete pipeline."))] - UnsupportedDeletePipeline { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to execute script by name: {}", name))] ExecuteScript { name: String, @@ -635,7 +629,6 @@ impl ErrorExt for Error { | FileWatch { .. } => StatusCode::Internal, UnsupportedDataType { .. } => StatusCode::Unsupported, - UnsupportedDeletePipeline { .. } => StatusCode::Unsupported, #[cfg(not(windows))] UpdateJemallocMetrics { .. } => StatusCode::Internal, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 3f7f71653f73..f0fd5a136759 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,6 +36,7 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::schema::SchemaRef; +use event::{LogState, LogValidatorRef}; use futures::FutureExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -588,11 +589,15 @@ impl HttpServerBuilder { } } - pub fn with_log_ingest_handler(self, handler: LogHandlerRef) -> Self { + pub fn with_log_ingest_handler( + self, + handler: LogHandlerRef, + validator: Option, + ) -> Self { Self { router: self.router.nest( &format!("/{HTTP_API_VERSION}/events"), - HttpServer::route_log(handler), + HttpServer::route_log(handler, validator), ), ..self } @@ -710,19 +715,29 @@ impl HttpServer { .with_state(metrics_handler) } - fn route_log(log_handler: LogHandlerRef) -> Router { + fn route_log( + log_handler: LogHandlerRef, + log_validator: Option, + ) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) .route( "/pipelines/:pipeline_name", routing::post(event::add_pipeline), ) + .route( + "/pipelines/:pipeline_name", + routing::delete(event::delete_pipeline), + ) .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) .layer(RequestDecompressionLayer::new()), ) - .with_state(log_handler) + .with_state(LogState { + log_handler, + log_validator, + }) } fn route_sql(api_state: ApiState) -> ApiRouter { diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index f9939b80572e..1d87a8c83ff7 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::result::Result as StdResult; +use std::sync::Arc; use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; use axum::body::HttpBody; @@ -33,7 +34,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{Deserializer, Value}; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, @@ -50,6 +51,7 @@ pub struct LogIngesterQueryParams { pub ignore_errors: Option, pub version: Option, + pub source: Option, } pub struct PipelineContent(String); @@ -99,11 +101,12 @@ where #[axum_macros::debug_handler] pub async fn add_pipeline( - State(handler): State, + State(state): State, Path(pipeline_name): Path, Extension(query_ctx): Extension, PipelineContent(payload): PipelineContent, ) -> Result { + let handler = state.log_handler; if pipeline_name.is_empty() { return Err(InvalidParameterSnafu { reason: "pipeline_name is required in path", @@ -129,6 +132,30 @@ pub async fn add_pipeline( }) } +#[axum_macros::debug_handler] +pub async fn delete_pipeline( + State(state): State, + Extension(query_ctx): Extension, + Path(pipeline_name): Path, +) -> Result { + let handler = state.log_handler; + ensure!( + !pipeline_name.is_empty(), + InvalidParameterSnafu { + reason: "pipeline_name is required", + } + ); + + handler + .delete_pipeline(&pipeline_name, query_ctx) + .await + .map(|_| "ok".to_string()) + .map_err(|e| { + error!(e; "failed to delete pipeline"); + e + }) +} + /// Transform NDJSON array into a single array fn transform_ndjson_array_factory( values: impl IntoIterator>, @@ -171,12 +198,20 @@ fn transform_ndjson_array_factory( #[axum_macros::debug_handler] pub async fn log_ingester( - State(handler): State, + State(log_state): State, Query(query_params): Query, Extension(query_ctx): Extension, TypedHeader(content_type): TypedHeader, payload: String, ) -> Result { + if let Some(log_validator) = log_state.log_validator { + if let Some(response) = log_validator.validate(query_params.source.clone(), &payload) { + return response; + } + } + + let handler = log_state.log_handler; + let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { reason: "pipeline_name is required", })?; @@ -255,3 +290,17 @@ async fn ingest_logs_inner( .with_execution_time(start.elapsed().as_millis() as u64); Ok(response) } + +pub trait LogValidator { + /// validate payload by source before processing + fn validate(&self, source: Option, payload: &str) -> Option>; +} + +pub type LogValidatorRef = Arc; + +/// axum state struct to hold log handler and validator +#[derive(Clone)] +pub struct LogState { + pub log_handler: LogHandlerRef, + pub log_validator: Option, +} diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 2aa8b756d935..b36187f177fd 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -425,6 +425,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()), Some(instance.instance.clone()), ) + .with_log_ingest_handler(instance.instance.clone(), None) .with_greptime_config_options(instance.opts.to_toml().unwrap()); if let Some(user_provider) = user_provider { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8bace12d2b98..3bf12f5d76bb 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -76,6 +76,8 @@ macro_rules! http_tests { test_dashboard_path, test_prometheus_remote_write, test_vm_proto_remote_write, + + test_pipeline_api, ); )* }; @@ -996,3 +998,83 @@ pub async fn test_vm_proto_remote_write(store_type: StorageType) { guard.remove_all().await; } + +pub async fn test_pipeline_api(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await; + + // handshake + let client = TestClient::new(app); + + let body = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + +transform: + - fields: + - id1 + - id2 + type: int32 + - fields: + - type + - log + - logger + type: string + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(res.text().await, "ok"); + + // 2. write data + let data_body = r#" +[ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } +] + "#; + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. remove pipeline + let res = client.delete("/v1/events/pipelines/test").send().await; + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(res.text().await, "ok"); + + // 4. write data failed + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_ne!(res.status(), StatusCode::OK); + + guard.remove_all().await; +} From 8ab53baa498611492469efed0dada5bbab2f5f1b Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Thu, 27 Jun 2024 11:56:11 +0800 Subject: [PATCH 02/11] chore: remove unused code --- src/pipeline/src/manager/error.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs index 10c361a59901..ad5d8a96bebd 100644 --- a/src/pipeline/src/manager/error.rs +++ b/src/pipeline/src/manager/error.rs @@ -24,13 +24,6 @@ use snafu::{Location, Snafu}; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Failed to find column in pipeline table, name: {}", name))] - FindColumnInPipelineTable { - name: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Pipeline table not found"))] PipelineTableNotFound { #[snafu(implicit)] @@ -116,7 +109,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - FindColumnInPipelineTable { .. } | CastType { .. } => StatusCode::Unexpected, + CastType { .. } => StatusCode::Unexpected, PipelineTableNotFound { .. } => StatusCode::TableNotFound, InsertPipeline { source, .. } => source.status_code(), CollectRecords { source, .. } => source.status_code(), From 92e657e7c5e7c89fc3b520a14614868b49be7105 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Thu, 27 Jun 2024 20:26:11 +0800 Subject: [PATCH 03/11] refactor: delete pipeline --- Cargo.lock | 1 + src/frontend/src/instance/log_handler.rs | 12 +- src/pipeline/src/lib.rs | 5 +- src/pipeline/src/manager.rs | 23 ++++ src/pipeline/src/manager/error.rs | 14 +- src/pipeline/src/manager/pipeline_operator.rs | 46 +++---- src/pipeline/src/manager/table.rs | 129 +++++++----------- src/pipeline/src/manager/util.rs | 61 +++++++++ src/servers/src/http/event.rs | 39 ++---- src/servers/src/query_handler.rs | 10 +- tests-integration/Cargo.toml | 1 + tests-integration/tests/http.rs | 25 +++- 12 files changed, 215 insertions(+), 151 deletions(-) create mode 100644 src/pipeline/src/manager/util.rs diff --git a/Cargo.lock b/Cargo.lock index d1518dfc0f29..e92818ffafa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11520,6 +11520,7 @@ dependencies = [ "tokio-stream", "tonic 0.11.0", "tower", + "url", "uuid", "zstd 0.13.1", ] diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 9b12788ee7a0..cccce0163441 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -19,8 +19,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; -use pipeline::table::{PipelineInfo, PipelineVersion}; -use pipeline::{GreptimeTransformer, Pipeline}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult}; use servers::query_handler::LogHandler; use session::context::QueryContextRef; @@ -69,9 +68,14 @@ impl LogHandler for Instance { .context(PipelineSnafu) } - async fn delete_pipeline(&self, name: &str, ctx: QueryContextRef) -> ServerResult<()> { + async fn delete_pipeline( + &self, + name: &str, + version: PipelineVersion, + ctx: QueryContextRef, + ) -> ServerResult<()> { self.pipeline_operator - .delete_pipeline(name, ctx) + .delete_pipeline(name, version, ctx) .await .context(PipelineSnafu) } diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 86ed9c7ea79b..82ce43c2d6d6 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -18,4 +18,7 @@ mod manager; pub use etl::transform::GreptimeTransformer; pub use etl::value::Value; pub use etl::{parse, Content, Pipeline}; -pub use manager::{error, pipeline_operator, table}; +pub use manager::{ + error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, + PipelineVersion, +}; diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index 95ffb5822ec3..a2b677751e23 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -12,6 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use common_time::Timestamp; +use datatypes::timestamp::TimestampNanosecond; + +use crate::table::PipelineTable; +use crate::{GreptimeTransformer, Pipeline}; + pub mod error; pub mod pipeline_operator; pub mod table; +pub mod util; + +const PIPELINE_TABLE_NAME: &str = "pipelines"; + +/// Pipeline version. An optional timestamp with nanosecond precision. +/// If the version is None, it means the latest version of the pipeline. +/// User can specify the version by providing a timestamp string formatted as iso8601. +/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch. +pub type PipelineVersion = Option; + +/// Pipeline info. A tuple of timestamp and pipeline reference. +pub type PipelineInfo = (Timestamp, PipelineRef); + +pub type PipelineTableRef = Arc; +pub type PipelineRef = Arc>; diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs index ad5d8a96bebd..07332590f1f0 100644 --- a/src/pipeline/src/manager/error.rs +++ b/src/pipeline/src/manager/error.rs @@ -101,6 +101,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid pipeline version format: {}", version))] + InvalidPipelineVersion { + version: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -113,9 +120,10 @@ impl ErrorExt for Error { PipelineTableNotFound { .. } => StatusCode::TableNotFound, InsertPipeline { source, .. } => source.status_code(), CollectRecords { source, .. } => source.status_code(), - PipelineNotFound { .. } | CompilePipeline { .. } | PipelineTransform { .. } => { - StatusCode::InvalidArguments - } + PipelineNotFound { .. } + | CompilePipeline { .. } + | PipelineTransform { .. } + | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments, BuildDfLogicalPlan { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), Catalog { source, .. } => source.status_code(), diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 0eab44817cd8..22c1523b18b1 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -27,11 +27,10 @@ use snafu::{OptionExt, ResultExt}; use table::TableRef; use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result}; -use crate::table::{PipelineInfo, PipelineTable, PipelineTableRef, PipelineVersion}; +use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion, PIPELINE_TABLE_NAME}; +use crate::table::PipelineTable; use crate::{GreptimeTransformer, Pipeline}; -pub const PIPELINE_TABLE_NAME: &str = "pipelines"; - /// PipelineOperator is responsible for managing pipelines. /// It provides the ability to: /// - Create a pipeline table if it does not exist @@ -50,7 +49,7 @@ pub struct PipelineOperator { impl PipelineOperator { /// Create a table request for the pipeline table. - pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { + fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema(); let create_table_expr = CreateTableExpr { @@ -146,26 +145,6 @@ impl PipelineOperator { pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option { self.tables.read().unwrap().get(catalog).cloned() } - - async fn insert_and_compile( - &self, - ctx: QueryContextRef, - name: &str, - content_type: &str, - pipeline: &str, - ) -> Result { - self.get_pipeline_table_from_cache(ctx.current_catalog()) - .context(PipelineTableNotFoundSnafu)? - .insert_and_compile(ctx.current_schema(), name, content_type, pipeline) - .await - } - - async fn delete_pipeline_by_name(&self, name: &str, ctx: QueryContextRef) -> Result<()> { - self.get_pipeline_table_from_cache(ctx.current_catalog()) - .context(PipelineTableNotFoundSnafu)? - .delete_pipeline_by_name(ctx.current_schema(), name) - .await - } } impl PipelineOperator { @@ -194,6 +173,7 @@ impl PipelineOperator { ) -> Result>> { self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; + self.get_pipeline_table_from_cache(query_ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? .get_pipeline(query_ctx.current_schema(), name, version) @@ -211,16 +191,26 @@ impl PipelineOperator { self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; - self.insert_and_compile(query_ctx, name, content_type, pipeline) + self.get_pipeline_table_from_cache(query_ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .insert_and_compile(query_ctx.current_schema(), name, content_type, pipeline) .await } /// Delete a pipeline by name from pipeline table. - pub async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()> { - // trigger load table + pub async fn delete_pipeline( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> Result<()> { + // trigger load pipeline table self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; - self.delete_pipeline_by_name(name, query_ctx).await + self.get_pipeline_table_from_cache(query_ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .delete_pipeline(query_ctx.current_schema(), name, version) + .await } } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index eab7fd4bdc6b..7f57f7d62e45 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -25,7 +25,7 @@ use common_recordbatch::util as record_util; use common_telemetry::{debug, info}; use common_time::timestamp::{TimeUnit, Timestamp}; use datafusion::datasource::DefaultTableSource; -use datafusion::logical_expr::{and, col, lit}; +use datafusion::logical_expr::col; use datafusion_common::{TableReference, ToDFSchema}; use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlanBuilder}; use datatypes::prelude::ScalarVector; @@ -44,36 +44,24 @@ use table::TableRef; use crate::error::{ BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu, - ExecuteInternalStatementSnafu, InsertPipelineSnafu, PipelineNotFoundSnafu, Result, + ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu, + PipelineNotFoundSnafu, Result, }; use crate::etl::transform::GreptimeTransformer; use crate::etl::{parse, Content, Pipeline}; +use crate::manager::{PipelineInfo, PipelineVersion, PIPELINE_TABLE_NAME}; +use crate::util::{build_plan_filter, generate_pipeline_cache_key}; -/// Pipeline version. An optional timestamp with nanosecond precision. -/// If the version is None, it means the latest version of the pipeline. -/// User can specify the version by providing a timestamp string formatted as iso8601. -/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch. -pub type PipelineVersion = Option; - -pub type PipelineTableRef = Arc; - -pub type PipelineRef = Arc>; - -/// Pipeline info. A tuple of timestamp and pipeline reference. -pub type PipelineInfo = (Timestamp, PipelineRef); - -pub const PIPELINE_TABLE_NAME: &str = "pipelines"; - -pub const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; -pub const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; -pub const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type"; -pub const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline"; -pub const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at"; +pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; +pub(crate) const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; +const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type"; +const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline"; +pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at"; /// Pipeline table cache size. -pub const PIPELINES_CACHE_SIZE: u64 = 10000; +const PIPELINES_CACHE_SIZE: u64 = 10000; /// Pipeline table cache time to live. -pub const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); +const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); /// PipelineTable is a table that stores the pipeline schema and content. /// Every catalog has its own pipeline table. @@ -216,36 +204,6 @@ impl PipelineTable { .map_err(|e| CompilePipelineSnafu { reason: e }.build()) } - fn generate_pipeline_cache_key(schema: &str, name: &str, version: PipelineVersion) -> String { - match version { - Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)), - None => format!("{}/{}/latest", schema, name), - } - } - - fn remove_pipeline_cache_with_schema_and_name(&self, schema: &str, name: &str) { - let key_prefix = format!("{}/{}", schema, name); - let keys = self - .pipelines - .iter() - .filter(|en| en.0.starts_with(&key_prefix)) - .map(|en| (*en.0).clone()) - .collect::>(); - for key in keys.iter() { - self.pipelines.remove(key); - } - } - - fn get_compiled_pipeline_from_cache( - &self, - schema: &str, - name: &str, - version: PipelineVersion, - ) -> Option>> { - self.pipelines - .get(&Self::generate_pipeline_cache_key(schema, name, version)) - } - /// Insert a pipeline into the pipeline table. async fn insert_pipeline_to_pipeline_table( &self, @@ -307,18 +265,21 @@ impl PipelineTable { name: &str, version: PipelineVersion, ) -> Result>> { - if let Some(pipeline) = self.get_compiled_pipeline_from_cache(schema, name, version) { + if let Some(pipeline) = self + .pipelines + .get(&generate_pipeline_cache_key(schema, name, version)) + { return Ok(pipeline); } let pipeline = self - .find_pipeline_by_name(schema, name, version) + .find_pipeline(schema, name, version) .await? .context(PipelineNotFoundSnafu { name, version })?; let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); self.pipelines.insert( - Self::generate_pipeline_cache_key(schema, name, version), + generate_pipeline_cache_key(schema, name, version), compiled_pipeline.clone(), ); Ok(compiled_pipeline) @@ -341,11 +302,11 @@ impl PipelineTable { { self.pipelines.insert( - Self::generate_pipeline_cache_key(schema, name, None), + generate_pipeline_cache_key(schema, name, None), compiled_pipeline.clone(), ); self.pipelines.insert( - Self::generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), + generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), compiled_pipeline.clone(), ); } @@ -353,9 +314,20 @@ impl PipelineTable { Ok((version, compiled_pipeline)) } - pub async fn delete_pipeline_by_name(&self, schema: &str, name: &str) -> Result<()> { + pub async fn delete_pipeline( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Result<()> { + // 0. version is ensured at the http api level not None + ensure!( + version.is_some(), + InvalidPipelineVersionSnafu { version: "None" } + ); + // 1. check pipeline exist in catalog - let pipeline = self.find_pipeline_by_name(schema, name, None).await?; + let pipeline = self.find_pipeline(schema, name, version).await?; if pipeline.is_none() { return Ok(()); } @@ -383,10 +355,7 @@ impl PipelineTable { // create scan plan let logical_plan = LogicalPlanBuilder::scan(table_name.clone(), table_source, None) .context(BuildDfLogicalPlanSnafu)? - .filter(and( - col("schema").eq(lit(schema)), - col("name").eq(lit(name)), - )) + .filter(build_plan_filter(schema, name, version)) .context(BuildDfLogicalPlanSnafu)? .build() .context(BuildDfLogicalPlanSnafu)?; @@ -407,15 +376,21 @@ impl PipelineTable { .await .context(ExecuteInternalStatementSnafu)?; - // cloud be deleting multiple rows - info!("delete pipeline: {:?}, output: {:?}", name, output); - // remove all caches - self.remove_pipeline_cache_with_schema_and_name(schema, name); + info!( + "delete pipeline: {:?}, version: {:?}, output: {:?}", + name, version, output + ); + + // remove cache with version and latest + self.pipelines + .remove(&generate_pipeline_cache_key(schema, name, version)); + self.pipelines + .remove(&generate_pipeline_cache_key(schema, name, None)); Ok(()) } - async fn find_pipeline_by_name( + async fn find_pipeline( &self, schema: &str, name: &str, @@ -431,22 +406,10 @@ impl PipelineTable { let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); let table_source = Arc::new(DefaultTableSource::new(table_provider)); - let schema_and_name_filter = and( - col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), - col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), - ); - let filter = if let Some(v) = version { - and( - schema_and_name_filter, - col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())), - ) - } else { - schema_and_name_filter - }; let plan = LogicalPlanBuilder::scan(table_name, table_source, None) .context(BuildDfLogicalPlanSnafu)? - .filter(filter) + .filter(build_plan_filter(schema, name, version)) .context(BuildDfLogicalPlanSnafu)? .project(vec![ col(PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME), diff --git a/src/pipeline/src/manager/util.rs b/src/pipeline/src/manager/util.rs new file mode 100644 index 000000000000..191434284c1e --- /dev/null +++ b/src/pipeline/src/manager/util.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::Timestamp; +use datafusion_expr::{and, col, lit, Expr}; +use datatypes::timestamp::TimestampNanosecond; + +use crate::error::{InvalidPipelineVersionSnafu, Result}; +use crate::table::{ + PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME, + PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME, +}; +use crate::PipelineVersion; + +pub fn to_pipeline_version(version_str: Option) -> Result { + match version_str { + Some(version) => { + let ts = Timestamp::from_str_utc(&version) + .map_err(|_| InvalidPipelineVersionSnafu { version }.build())?; + Ok(Some(TimestampNanosecond(ts))) + } + None => Ok(None), + } +} + +pub(crate) fn build_plan_filter(schema: &str, name: &str, version: PipelineVersion) -> Expr { + let schema_and_name_filter = and( + col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), + col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), + ); + if let Some(v) = version { + and( + schema_and_name_filter, + col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())), + ) + } else { + schema_and_name_filter + } +} + +pub(crate) fn generate_pipeline_cache_key( + schema: &str, + name: &str, + version: PipelineVersion, +) -> String { + match version { + Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)), + None => format!("{}/{}/latest", schema, name), + } +} diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index d2cf038c8c06..801674151b3c 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -24,13 +24,11 @@ use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, TypedHeader}; use common_telemetry::{error, warn}; -use common_time::{Timestamp, Timezone}; -use datatypes::timestamp::TimestampNanosecond; use http::{HeaderMap, HeaderValue}; use mime_guess::mime; use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; -use pipeline::table::PipelineVersion; -use pipeline::Value as PipelineValue; +use pipeline::util::to_pipeline_version; +use pipeline::{PipelineVersion, Value as PipelineValue}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Deserializer, Value}; @@ -133,15 +131,8 @@ pub async fn add_pipeline( HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(); let mut headers = HeaderMap::new(); headers.append(CONTENT_TYPE, json_header); - // Safety check: unwrap is safe here because we have checked the format of the timestamp - let version = pipeline - .0 - .as_formatted_string( - "%Y-%m-%d %H:%M:%S%.fZ", - // Safety check: unwrap is safe here because we have checked the format of the timezone - Some(Timezone::from_tz_string("UTC").unwrap()).as_ref(), - ) - .unwrap(); + + let version = pipeline.0.to_timezone_aware_string(None); ( headers, json!({"version": version, "name": pipeline_name}).to_string(), @@ -157,6 +148,7 @@ pub async fn add_pipeline( pub async fn delete_pipeline( State(state): State, Extension(query_ctx): Extension, + Query(query_params): Query, Path(pipeline_name): Path, ) -> Result { let handler = state.log_handler; @@ -167,8 +159,14 @@ pub async fn delete_pipeline( } ); + let version = query_params.version.context(InvalidParameterSnafu { + reason: "version is required", + })?; + + let version = to_pipeline_version(Some(version)).context(PipelineSnafu)?; + handler - .delete_pipeline(&pipeline_name, query_ctx) + .delete_pipeline(&pipeline_name, version, query_ctx) .await .map(|_| "ok".to_string()) .map_err(|e| { @@ -240,18 +238,7 @@ pub async fn log_ingester( reason: "table is required", })?; - let version = match query_params.version { - Some(version) => { - let ts = Timestamp::from_str_utc(&version).map_err(|e| { - InvalidParameterSnafu { - reason: format!("invalid pipeline version: {} with error: {}", &version, e), - } - .build() - })?; - Some(TimestampNanosecond(ts)) - } - None => None, - }; + let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; let ignore_errors = query_params.ignore_errors.unwrap_or(false); diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index cdd628b4bb87..edf6c6b755f3 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -35,8 +35,7 @@ use common_query::Output; use headers::HeaderValue; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use pipeline::table::{PipelineInfo, PipelineVersion}; -use pipeline::{GreptimeTransformer, Pipeline}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; use serde_json::Value; use session::context::QueryContextRef; @@ -145,5 +144,10 @@ pub trait LogHandler { query_ctx: QueryContextRef, ) -> Result; - async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()>; + async fn delete_pipeline( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> Result<()>; } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 887f04a3b218..905b03bdd1b7 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -90,3 +90,4 @@ script.workspace = true session = { workspace = true, features = ["testing"] } store-api.workspace = true tokio-postgres = "0.7" +url = "2.3" diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d12cac8346b8..96f5d5ffee96 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -19,7 +19,7 @@ use auth::user_provider_from_option; use axum::http::{HeaderName, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use prost::Message; -use serde_json::json; +use serde_json::{json, Value}; use servers::http::error_result::ErrorResponse; use servers::http::greptime_result_v1::GreptimedbV1Response; use servers::http::handler::HealthResponse; @@ -1039,7 +1039,21 @@ transform: .await; assert_eq!(res.status(), StatusCode::OK); - assert_eq!(res.text().await, "ok"); + + let content = res.text().await; + + let content = serde_json::from_str(&content); + assert!(content.is_ok()); + let content: Value = content.unwrap(); + let pipeline_name = content.get("name"); + assert!(pipeline_name.is_some()); + assert_eq!(pipeline_name.unwrap(), "test"); + + let version = content.get("version"); + assert!(version.is_some()); + let version = version.unwrap().as_str(); + assert!(version.is_some()); + let version = version.unwrap(); // 2. write data let data_body = r#" @@ -1062,8 +1076,13 @@ transform: .await; assert_eq!(res.status(), StatusCode::OK); + let encoded: String = url::form_urlencoded::byte_serialize(version.as_bytes()).collect(); + // 3. remove pipeline - let res = client.delete("/v1/events/pipelines/test").send().await; + let res = client + .delete(format!("/v1/events/pipelines/test?version={}", encoded).as_str()) + .send() + .await; assert_eq!(res.status(), StatusCode::OK); assert_eq!(res.text().await, "ok"); From 8ba0b173cf1be55fa64a88071c656ada43b278e1 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 3 Jul 2024 15:47:32 +0800 Subject: [PATCH 04/11] chore: add pipeline management api metrics --- src/pipeline/src/lib.rs | 1 + src/pipeline/src/manager/pipeline_operator.rs | 24 ++++++++++++ src/pipeline/src/metrics.rs | 37 +++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 src/pipeline/src/metrics.rs diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 82ce43c2d6d6..23c9d2c488e2 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -14,6 +14,7 @@ mod etl; mod manager; +mod metrics; pub use etl::transform::GreptimeTransformer; pub use etl::value::Value; diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 4a49195d6ed3..b074ee1f0551 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -14,11 +14,13 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use std::time::Instant; use api::v1::CreateTableExpr; use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME}; use common_telemetry::info; +use futures::FutureExt; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; use query::QueryEngineRef; @@ -28,6 +30,10 @@ use table::TableRef; use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result}; use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion, PIPELINE_TABLE_NAME}; +use crate::metrics::{ + METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM, + METRIC_PIPELINE_RETRIEVE_HISTOGRAM, +}; use crate::table::PipelineTable; use crate::{GreptimeTransformer, Pipeline}; @@ -175,9 +181,15 @@ impl PipelineOperator { self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; + let timer = Instant::now(); self.get_pipeline_table_from_cache(query_ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? .get_pipeline(&schema, name, version) + .inspect(|re| { + METRIC_PIPELINE_RETRIEVE_HISTOGRAM + .with_label_values(&[&re.is_ok().to_string()]) + .observe(timer.elapsed().as_secs_f64()) + }) .await } @@ -192,9 +204,15 @@ impl PipelineOperator { self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; + let timer = Instant::now(); self.get_pipeline_table_from_cache(query_ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? .insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline) + .inspect(|re| { + METRIC_PIPELINE_CREATE_HISTOGRAM + .with_label_values(&[&re.is_ok().to_string()]) + .observe(timer.elapsed().as_secs_f64()) + }) .await } @@ -209,9 +227,15 @@ impl PipelineOperator { self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; + let timer = Instant::now(); self.get_pipeline_table_from_cache(query_ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? .delete_pipeline(&query_ctx.current_schema(), name, version) + .inspect(|re| { + METRIC_PIPELINE_DELETE_HISTOGRAM + .with_label_values(&[&re.is_ok().to_string()]) + .observe(timer.elapsed().as_secs_f64()) + }) .await } } diff --git a/src/pipeline/src/metrics.rs b/src/pipeline/src/metrics.rs new file mode 100644 index 000000000000..280f5619d468 --- /dev/null +++ b/src/pipeline/src/metrics.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use prometheus::{register_histogram_vec, HistogramVec}; + +lazy_static! { + pub static ref METRIC_PIPELINE_CREATE_HISTOGRAM: HistogramVec = register_histogram_vec!( + "greptime_pipeline_create_duration_seconds", + "Histogram of the pipeline creation duration", + &["success"] + ) + .unwrap(); + pub static ref METRIC_PIPELINE_DELETE_HISTOGRAM: HistogramVec = register_histogram_vec!( + "greptime_pipeline_delete_duration_seconds", + "Histogram of the pipeline deletion duration", + &["success"] + ) + .unwrap(); + pub static ref METRIC_PIPELINE_RETRIEVE_HISTOGRAM: HistogramVec = register_histogram_vec!( + "greptime_pipeline_retrieve_duration_seconds", + "Histogram of the pipeline retrieval duration", + &["success"] + ) + .unwrap(); +} From cae2f535fdd0fc983a976f836aa0d589d53f1612 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 3 Jul 2024 16:28:00 +0800 Subject: [PATCH 05/11] chore: minor cr issues --- src/pipeline/src/manager.rs | 2 -- src/pipeline/src/manager/pipeline_operator.rs | 4 ++-- src/pipeline/src/manager/table.rs | 3 ++- src/servers/src/http/event.rs | 1 + 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index a2b677751e23..960197e083e8 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -25,8 +25,6 @@ pub mod pipeline_operator; pub mod table; pub mod util; -const PIPELINE_TABLE_NAME: &str = "pipelines"; - /// Pipeline version. An optional timestamp with nanosecond precision. /// If the version is None, it means the latest version of the pipeline. /// User can specify the version by providing a timestamp string formatted as iso8601. diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index b074ee1f0551..4ce9442da639 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -29,12 +29,12 @@ use snafu::{OptionExt, ResultExt}; use table::TableRef; use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result}; -use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion, PIPELINE_TABLE_NAME}; +use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion}; use crate::metrics::{ METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM, METRIC_PIPELINE_RETRIEVE_HISTOGRAM, }; -use crate::table::PipelineTable; +use crate::table::{PipelineTable, PIPELINE_TABLE_NAME}; use crate::{GreptimeTransformer, Pipeline}; /// PipelineOperator is responsible for managing pipelines. diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 7f57f7d62e45..10f8e9df5e89 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -49,9 +49,10 @@ use crate::error::{ }; use crate::etl::transform::GreptimeTransformer; use crate::etl::{parse, Content, Pipeline}; -use crate::manager::{PipelineInfo, PipelineVersion, PIPELINE_TABLE_NAME}; +use crate::manager::{PipelineInfo, PipelineVersion}; use crate::util::{build_plan_filter, generate_pipeline_cache_key}; +pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines"; pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; pub(crate) const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type"; diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 801674151b3c..26e540749594 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -301,6 +301,7 @@ async fn ingest_logs_inner( pub trait LogValidator { /// validate payload by source before processing + /// Return a `Some` result to indicate validation failure. fn validate(&self, source: Option, payload: &str) -> Option>; } From 7a2a61e08c883c9ee7d0136c00d9f13ea5798be0 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 3 Jul 2024 16:46:50 +0800 Subject: [PATCH 06/11] chore: add unit test --- src/pipeline/src/manager/util.rs | 37 ++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/pipeline/src/manager/util.rs b/src/pipeline/src/manager/util.rs index 191434284c1e..6133c64215d0 100644 --- a/src/pipeline/src/manager/util.rs +++ b/src/pipeline/src/manager/util.rs @@ -59,3 +59,40 @@ pub(crate) fn generate_pipeline_cache_key( None => format!("{}/{}/latest", schema, name), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_to_pipeline_version() { + let none_result = to_pipeline_version(None); + assert!(none_result.is_ok()); + assert!(none_result.unwrap().is_none()); + + let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z".to_string())); + assert!(some_result.is_ok()); + assert_eq!( + some_result.unwrap(), + Some(TimestampNanosecond::new(1672531200000000000)) + ); + + let invalid = to_pipeline_version(Some("invalid".to_string())); + assert!(invalid.is_err()); + } + + #[test] + fn test_generate_pipeline_cache_key() { + let schema = "test_schema"; + let name = "test_name"; + let latest = generate_pipeline_cache_key(schema, name, None); + assert_eq!(latest, "test_schema/test_name/latest"); + + let versioned = generate_pipeline_cache_key( + schema, + name, + Some(TimestampNanosecond::new(1672531200000000000)), + ); + assert_eq!(versioned, "test_schema/test_name/1672531200000000000"); + } +} From 0ba09f90df02b20233c8b1670384389f2aa0d2f0 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Wed, 3 Jul 2024 19:19:27 +0800 Subject: [PATCH 07/11] chore: fix cr issue --- src/frontend/src/instance/log_handler.rs | 2 +- src/pipeline/src/manager/pipeline_operator.rs | 2 +- src/pipeline/src/manager/table.rs | 16 ++++---- src/servers/src/http/event.rs | 38 +++++++++++-------- src/servers/src/query_handler.rs | 2 +- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index cccce0163441..7edda5ccf130 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -73,7 +73,7 @@ impl LogHandler for Instance { name: &str, version: PipelineVersion, ctx: QueryContextRef, - ) -> ServerResult<()> { + ) -> ServerResult> { self.pipeline_operator .delete_pipeline(name, version, ctx) .await diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 4ce9442da639..049cd80b452a 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -222,7 +222,7 @@ impl PipelineOperator { name: &str, version: PipelineVersion, query_ctx: QueryContextRef, - ) -> Result<()> { + ) -> Result> { // trigger load pipeline table self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 10f8e9df5e89..d3197123cce5 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -248,9 +248,8 @@ impl PipelineTable { .context(InsertPipelineSnafu)?; info!( - "Inserted pipeline: {} into {} table: {}, output: {:?}.", + "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}", name, - PIPELINE_TABLE_NAME, table_info.full_table_name(), output ); @@ -320,7 +319,7 @@ impl PipelineTable { schema: &str, name: &str, version: PipelineVersion, - ) -> Result<()> { + ) -> Result> { // 0. version is ensured at the http api level not None ensure!( version.is_some(), @@ -330,7 +329,7 @@ impl PipelineTable { // 1. check pipeline exist in catalog let pipeline = self.find_pipeline(schema, name, version).await?; if pipeline.is_none() { - return Ok(()); + return Ok(None); } // 2. do delete @@ -378,8 +377,11 @@ impl PipelineTable { .context(ExecuteInternalStatementSnafu)?; info!( - "delete pipeline: {:?}, version: {:?}, output: {:?}", - name, version, output + "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}", + name, + version, + table_info.full_table_name(), + output ); // remove cache with version and latest @@ -388,7 +390,7 @@ impl PipelineTable { self.pipelines .remove(&generate_pipeline_cache_key(schema, name, None)); - Ok(()) + Ok(Some(())) } async fn find_pipeline( diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 26e540749594..66f16e97ef8e 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -126,18 +126,7 @@ pub async fn add_pipeline( .await; result - .map(|pipeline| { - let json_header = - HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(); - let mut headers = HeaderMap::new(); - headers.append(CONTENT_TYPE, json_header); - - let version = pipeline.0.to_timezone_aware_string(None); - ( - headers, - json!({"version": version, "name": pipeline_name}).to_string(), - ) - }) + .map(|pipeline| to_json_result(pipeline_name, pipeline.0.to_timezone_aware_string(None))) .map_err(|e| { error!(e; "failed to insert pipeline"); e @@ -150,7 +139,7 @@ pub async fn delete_pipeline( Extension(query_ctx): Extension, Query(query_params): Query, Path(pipeline_name): Path, -) -> Result { +) -> Result { let handler = state.log_handler; ensure!( !pipeline_name.is_empty(), @@ -159,16 +148,22 @@ pub async fn delete_pipeline( } ); - let version = query_params.version.context(InvalidParameterSnafu { + let version_str = query_params.version.context(InvalidParameterSnafu { reason: "version is required", })?; - let version = to_pipeline_version(Some(version)).context(PipelineSnafu)?; + let version = to_pipeline_version(Some(version_str.clone())).context(PipelineSnafu)?; handler .delete_pipeline(&pipeline_name, version, query_ctx) .await - .map(|_| "ok".to_string()) + .map(|v| { + if v.is_some() { + to_json_result(pipeline_name, version_str).into_response() + } else { + (HeaderMap::new(), json!({}).to_string()).into_response() + } + }) .map_err(|e| { error!(e; "failed to delete pipeline"); e @@ -299,6 +294,17 @@ async fn ingest_logs_inner( Ok(response) } +fn to_json_result(pipeline_name: String, version: String) -> impl IntoResponse { + let json_header = HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(); + let mut headers = HeaderMap::new(); + headers.append(CONTENT_TYPE, json_header); + + ( + headers, + json!({"version": version, "name": pipeline_name}).to_string(), + ) +} + pub trait LogValidator { /// validate payload by source before processing /// Return a `Some` result to indicate validation failure. diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index edf6c6b755f3..1fe64e652265 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -149,5 +149,5 @@ pub trait LogHandler { name: &str, version: PipelineVersion, query_ctx: QueryContextRef, - ) -> Result<()>; + ) -> Result>; } From 65770613379ad8d070b8992bc569f37374e74956 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Thu, 4 Jul 2024 14:43:21 +0800 Subject: [PATCH 08/11] fix: test --- src/pipeline/tests/gsub.rs | 2 +- src/pipeline/tests/pipeline.rs | 2 +- tests-integration/tests/http.rs | 13 ++++++++++--- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/pipeline/tests/gsub.rs b/src/pipeline/tests/gsub.rs index 5d25bf188b68..f1209a6f8830 100644 --- a/src/pipeline/tests/gsub.rs +++ b/src/pipeline/tests/gsub.rs @@ -29,7 +29,7 @@ fn test_gsub() { let pipeline_yaml = r#" --- -description: Pipeline for Akamai DataStream2 Log +description: Pipeline for Demo Log processors: - gsub: diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index ff9cad1bdea1..08f2ad38116f 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -81,7 +81,7 @@ fn test_complex_data() { let pipeline_yaml = r#" --- -description: Pipeline for Akamai DataStream2 Log +description: Pipeline for Demo Log processors: - urlencoding: diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 96f5d5ffee96..e5c17899f774 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1053,7 +1053,7 @@ transform: assert!(version.is_some()); let version = version.unwrap().as_str(); assert!(version.is_some()); - let version = version.unwrap(); + let version_str = version.unwrap(); // 2. write data let data_body = r#" @@ -1076,7 +1076,7 @@ transform: .await; assert_eq!(res.status(), StatusCode::OK); - let encoded: String = url::form_urlencoded::byte_serialize(version.as_bytes()).collect(); + let encoded: String = url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect(); // 3. remove pipeline let res = client @@ -1085,7 +1085,14 @@ transform: .await; assert_eq!(res.status(), StatusCode::OK); - assert_eq!(res.text().await, "ok"); + + let content = res.text().await; + + let content = serde_json::from_str(&content); + assert!(content.is_ok()); + let content: Value = content.unwrap(); + + assert_eq!(content, json!({"name": "test", "version": version_str})); // 4. write data failed let res = client From fe1707c577f5000d57bf70b3ea0c489425a590df Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Thu, 4 Jul 2024 17:38:25 +0800 Subject: [PATCH 09/11] chore: add `GreptimedbManageResponse` --- src/servers/src/http.rs | 1 + src/servers/src/http/event.rs | 38 +++--- src/servers/src/http/greptime_manage_resp.rs | 136 +++++++++++++++++++ tests-integration/tests/http.rs | 38 ++++-- 4 files changed, 181 insertions(+), 32 deletions(-) create mode 100644 src/servers/src/http/greptime_manage_resp.rs diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index ba5f2ed2f618..6204908c03a7 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -92,6 +92,7 @@ pub mod csv_result; #[cfg(feature = "dashboard")] mod dashboard; pub mod error_result; +pub mod greptime_manage_resp; pub mod greptime_result_v1; pub mod influxdb_result_v1; pub mod table_result; diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 66f16e97ef8e..ea436009b004 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -14,6 +14,7 @@ use std::result::Result as StdResult; use std::sync::Arc; +use std::time::Instant; use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; use axum::body::HttpBody; @@ -24,20 +25,20 @@ use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, TypedHeader}; use common_telemetry::{error, warn}; -use http::{HeaderMap, HeaderValue}; use mime_guess::mime; use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; use pipeline::util::to_pipeline_version; use pipeline::{PipelineVersion, Value as PipelineValue}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use serde_json::{json, Deserializer, Value}; +use serde_json::{Deserializer, Value}; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; +use crate::http::greptime_manage_resp::GreptimedbManageResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; use crate::query_handler::LogHandlerRef; @@ -104,7 +105,8 @@ pub async fn add_pipeline( Path(pipeline_name): Path, Extension(query_ctx): Extension, PipelineContent(payload): PipelineContent, -) -> Result { +) -> Result { + let start = Instant::now(); let handler = state.log_handler; if pipeline_name.is_empty() { return Err(InvalidParameterSnafu { @@ -126,7 +128,13 @@ pub async fn add_pipeline( .await; result - .map(|pipeline| to_json_result(pipeline_name, pipeline.0.to_timezone_aware_string(None))) + .map(|pipeline| { + GreptimedbManageResponse::from_pipeline( + pipeline_name, + pipeline.0.to_timezone_aware_string(None), + start.elapsed().as_millis() as u64, + ) + }) .map_err(|e| { error!(e; "failed to insert pipeline"); e @@ -139,7 +147,8 @@ pub async fn delete_pipeline( Extension(query_ctx): Extension, Query(query_params): Query, Path(pipeline_name): Path, -) -> Result { +) -> Result { + let start = Instant::now(); let handler = state.log_handler; ensure!( !pipeline_name.is_empty(), @@ -159,9 +168,13 @@ pub async fn delete_pipeline( .await .map(|v| { if v.is_some() { - to_json_result(pipeline_name, version_str).into_response() + GreptimedbManageResponse::from_pipeline( + pipeline_name, + version_str, + start.elapsed().as_millis() as u64, + ) } else { - (HeaderMap::new(), json!({}).to_string()).into_response() + GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64) } }) .map_err(|e| { @@ -294,17 +307,6 @@ async fn ingest_logs_inner( Ok(response) } -fn to_json_result(pipeline_name: String, version: String) -> impl IntoResponse { - let json_header = HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(); - let mut headers = HeaderMap::new(); - headers.append(CONTENT_TYPE, json_header); - - ( - headers, - json!({"version": version, "name": pipeline_name}).to_string(), - ) -} - pub trait LogValidator { /// validate payload by source before processing /// Return a `Some` result to indicate validation failure. diff --git a/src/servers/src/http/greptime_manage_resp.rs b/src/servers/src/http/greptime_manage_resp.rs new file mode 100644 index 000000000000..3b5c160000ae --- /dev/null +++ b/src/servers/src/http/greptime_manage_resp.rs @@ -0,0 +1,136 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::response::IntoResponse; +use axum::Json; +use http::header::CONTENT_TYPE; +use http::HeaderValue; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; + +/// Greptimedb Manage Api Response struct +/// Currently we have `Pipelines` and `Scripts` as control panel api +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct GreptimedbManageResponse { + #[serde(flatten)] + pub(crate) manage_result: ManageResult, + pub(crate) execution_time_ms: u64, +} + +impl GreptimedbManageResponse { + pub fn from_pipeline(name: String, version: String, execution_time_ms: u64) -> Self { + GreptimedbManageResponse { + manage_result: ManageResult::Pipelines { + pipelines: vec![PipelineOutput { name, version }], + }, + execution_time_ms, + } + } + + pub fn from_pipelines(pipelines: Vec, execution_time_ms: u64) -> Self { + GreptimedbManageResponse { + manage_result: ManageResult::Pipelines { pipelines }, + execution_time_ms, + } + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[serde(untagged)] +pub enum ManageResult { + Pipelines { pipelines: Vec }, + // todo(shuiyisong): refactor scripts api + Scripts(), +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct PipelineOutput { + name: String, + version: String, +} + +impl IntoResponse for GreptimedbManageResponse { + fn into_response(self) -> axum::response::Response { + let execution_time = self.execution_time_ms; + + let mut resp = Json(self).into_response(); + + // We delibrately don't add this format into [`crate::http::ResponseFormat`] + // because this is a format for manage api other than the data query api + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static("greptimedb_manage"), + ); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(), + ); + + resp + } +} + +#[cfg(test)] +mod tests { + + use arrow::datatypes::ToByteSlice; + use http_body::Body; + use hyper::body::to_bytes; + + use super::*; + + #[tokio::test] + async fn test_into_response() { + let resp = GreptimedbManageResponse { + manage_result: ManageResult::Pipelines { + pipelines: vec![PipelineOutput { + name: "test_name".to_string(), + version: "test_version".to_string(), + }], + }, + execution_time_ms: 42, + }; + + let mut re = resp.into_response(); + let data = re.data(); + + let data_str = format!("{:?}", data); + assert_eq!( + data_str, + r#"Data(Response { status: 200, version: HTTP/1.1, headers: {"content-type": "application/json", "x-greptime-format": "greptimedb_manage", "x-greptime-execution-time": "42"}, body: UnsyncBoxBody })"# + ); + + let body_bytes = to_bytes(re.into_body()).await.unwrap(); + let body_str = String::from_utf8_lossy(body_bytes.to_byte_slice()); + assert_eq!( + body_str, + r#"{"pipelines":[{"name":"test_name","version":"test_version"}],"execution_time_ms":42}"# + ); + } +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e5c17899f774..6d434d58b844 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1044,16 +1044,23 @@ transform: let content = serde_json::from_str(&content); assert!(content.is_ok()); + // {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]} let content: Value = content.unwrap(); - let pipeline_name = content.get("name"); - assert!(pipeline_name.is_some()); - assert_eq!(pipeline_name.unwrap(), "test"); - let version = content.get("version"); - assert!(version.is_some()); - let version = version.unwrap().as_str(); - assert!(version.is_some()); - let version_str = version.unwrap(); + let execution_time = content.get("execution_time_ms"); + assert!(execution_time.unwrap().is_number()); + let pipelines = content.get("pipelines"); + let pipelines = pipelines.unwrap().as_array().unwrap(); + assert_eq!(pipelines.len(), 1); + let pipeline = pipelines.first().unwrap(); + assert_eq!(pipeline.get("name").unwrap(), "test"); + + let version_str = pipeline + .get("version") + .unwrap() + .as_str() + .unwrap() + .to_string(); // 2. write data let data_body = r#" @@ -1067,7 +1074,7 @@ transform: "log": "ClusterAdapter:enter sendTextDataToCluster\\n" } ] - "#; +"#; let res = client .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") .header("Content-Type", "application/json") @@ -1086,13 +1093,15 @@ transform: assert_eq!(res.status(), StatusCode::OK); + // {"pipelines":[{"name":"test","version":"2024-07-04 08:55:29.038347"}],"execution_time_ms":22} let content = res.text().await; + let content: Value = serde_json::from_str(&content).unwrap(); + assert!(content.get("execution_time_ms").unwrap().is_number()); - let content = serde_json::from_str(&content); - assert!(content.is_ok()); - let content: Value = content.unwrap(); - - assert_eq!(content, json!({"name": "test", "version": version_str})); + assert_eq!( + content.get("pipelines").unwrap().to_string(), + format!(r#"[{{"name":"test","version":"{}"}}]"#, version_str).as_str() + ); // 4. write data failed let res = client @@ -1101,6 +1110,7 @@ transform: .body(data_body) .send() .await; + // todo(shuiyisong): refactor http error handling assert_ne!(res.status(), StatusCode::OK); guard.remove_all().await; From a1d7ab7b7a6e031c77448c5cd60b40e3e80476ba Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Thu, 4 Jul 2024 17:47:54 +0800 Subject: [PATCH 10/11] fix: typo --- src/servers/src/http/greptime_manage_resp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/servers/src/http/greptime_manage_resp.rs b/src/servers/src/http/greptime_manage_resp.rs index 3b5c160000ae..d2f61715b5e3 100644 --- a/src/servers/src/http/greptime_manage_resp.rs +++ b/src/servers/src/http/greptime_manage_resp.rs @@ -77,7 +77,7 @@ impl IntoResponse for GreptimedbManageResponse { let mut resp = Json(self).into_response(); - // We delibrately don't add this format into [`crate::http::ResponseFormat`] + // We deliberately don't add this format into [`crate::http::ResponseFormat`] // because this is a format for manage api other than the data query api resp.headers_mut().insert( &GREPTIME_DB_HEADER_FORMAT, From a8913c0f508b847de3a0f521e053decba9cde8e9 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Fri, 5 Jul 2024 10:34:03 +0800 Subject: [PATCH 11/11] fix: typo --- src/script/src/python/rspython/builtins/test.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/script/src/python/rspython/builtins/test.rs b/src/script/src/python/rspython/builtins/test.rs index f7b9c851e069..a108663ad7d7 100644 --- a/src/script/src/python/rspython/builtins/test.rs +++ b/src/script/src/python/rspython/builtins/test.rs @@ -39,7 +39,7 @@ use crate::python::utils::format_py_error; #[test] fn convert_scalar_to_py_obj_and_back() { rustpython_vm::Interpreter::with_init(Default::default(), |vm| { - // this can be in `.enter()` closure, but for clearity, put it in the `with_init()` + // this can be in `.enter()` closure, but for clarity, put it in the `with_init()` let _ = PyVector::make_class(&vm.ctx); }) .enter(|vm| { @@ -422,7 +422,7 @@ fn test_vm() { rustpython_vm::Interpreter::with_init(Default::default(), |vm| { vm.add_native_module("udf_builtins", Box::new(greptime_builtin::make_module)); - // this can be in `.enter()` closure, but for clearity, put it in the `with_init()` + // this can be in `.enter()` closure, but for clarity, put it in the `with_init()` let _ = PyVector::make_class(&vm.ctx); }) .enter(|vm| {