Skip to content

Commit

Permalink
enhancement(http sink): update HTTP request builder to return error (v…
Browse files Browse the repository at this point in the history
…ectordotdev#19886)

* mvp

* formatting

* test fix

* update

* take ownership of payload

* changelog

* remove Option wrap

* changelog

* review

* test fix

* bump rust version

* update build func and usages

* merge

* fix template rendering error metric

* decided against sneaking in a fix, will put in separate PR

* use statics
  • Loading branch information
sebastiantia authored Feb 16, 2024
1 parent 5d8160d commit a798f68
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 39 deletions.
13 changes: 8 additions & 5 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder},
retries::RetryAction,
},
UriParseSnafu,
HTTPRequestBuilderSnafu, UriParseSnafu,
},
};
use bytes::Bytes;
Expand Down Expand Up @@ -69,7 +69,10 @@ pub(super) struct ClickhouseServiceRequestBuilder {
}

impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder {
fn build(&self, mut request: HttpRequest<PartitionKey>) -> Request<Bytes> {
fn build(
&self,
mut request: HttpRequest<PartitionKey>,
) -> Result<Request<Bytes>, crate::Error> {
let metadata = request.get_additional_metadata();

let uri = set_uri_query(
Expand All @@ -79,8 +82,7 @@ impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder
metadata.format,
self.skip_unknown_fields,
self.date_time_best_effort,
)
.expect("building uri failed unexpectedly");
)?;

let auth: Option<Auth> = self.auth.clone();

Expand All @@ -98,7 +100,8 @@ impl HttpServiceRequestBuilder<PartitionKey> for ClickhouseServiceRequestBuilder

builder
.body(payload)
.expect("building HTTP request failed unexpectedly")
.context(HTTPRequestBuilderSnafu)
.map_err(Into::into)
}
}

Expand Down
22 changes: 14 additions & 8 deletions src/sinks/gcp/stackdriver/logs/service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
//! Service implementation for the `gcp_stackdriver_logs` sink.

use bytes::Bytes;
use http::{Request, Uri};
use http::{header::CONTENT_TYPE, Request, Uri};

use crate::{
gcp::GcpAuthenticator,
sinks::util::http::{HttpRequest, HttpServiceRequestBuilder},
sinks::{
util::http::{HttpRequest, HttpServiceRequestBuilder},
HTTPRequestBuilderSnafu,
},
};
use snafu::ResultExt;

#[derive(Debug, Clone)]
pub(super) struct StackdriverLogsServiceRequestBuilder {
Expand All @@ -15,14 +19,16 @@ pub(super) struct StackdriverLogsServiceRequestBuilder {
}

impl HttpServiceRequestBuilder<()> for StackdriverLogsServiceRequestBuilder {
fn build(&self, mut request: HttpRequest<()>) -> Request<Bytes> {
let mut builder = Request::post(self.uri.clone())
.header("Content-Type", "application/json")
fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
let builder = Request::post(self.uri.clone()).header(CONTENT_TYPE, "application/json");

let mut request = builder
.body(request.take_payload())
.unwrap();
.context(HTTPRequestBuilderSnafu)
.map_err(Into::<crate::Error>::into)?;

self.auth.apply(&mut builder);
self.auth.apply(&mut request);

builder
Ok(request)
}
}
5 changes: 4 additions & 1 deletion src/sinks/gcp/stackdriver/logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,10 @@ async fn correct_request() {
(),
);

let request = stackdriver_logs_service_request_builder.build(http_request);
let request = stackdriver_logs_service_request_builder
.build(http_request)
.unwrap();

let (parts, body) = request.into_parts();
let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap();

Expand Down
25 changes: 14 additions & 11 deletions src/sinks/gcp/stackdriver/metrics/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use bytes::Bytes;
use goauth::scopes::Scope;
use http::{Request, Uri};
use http::{header::CONTENT_TYPE, Request, Uri};

use super::{
request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
sink::StackdriverMetricsSink,
};
use crate::{
gcp::{GcpAuthConfig, GcpAuthenticator},
http::HttpClient,
Expand All @@ -14,13 +18,10 @@ use crate::{
},
service::TowerRequestConfigDefaults,
},
HTTPRequestBuilderSnafu,
},
};

use super::{
request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
sink::StackdriverMetricsSink,
};
use snafu::ResultExt;

#[derive(Clone, Copy, Debug)]
pub struct StackdriverMetricsTowerRequestConfigDefaults;
Expand Down Expand Up @@ -159,15 +160,17 @@ pub(super) struct StackdriverMetricsServiceRequestBuilder {
}

impl HttpServiceRequestBuilder<()> for StackdriverMetricsServiceRequestBuilder {
fn build(&self, mut request: HttpRequest<()>) -> Request<Bytes> {
let mut request = Request::post(self.uri.clone())
.header("Content-Type", "application/json")
fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
let builder = Request::post(self.uri.clone()).header(CONTENT_TYPE, "application/json");

let mut request = builder
.body(request.take_payload())
.unwrap();
.context(HTTPRequestBuilderSnafu)
.map_err(Into::<crate::Error>::into)?;

self.auth.apply(&mut request);

request
Ok(request)
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/sinks/honeycomb/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use bytes::Bytes;
use http::{Request, Uri};
use vector_lib::sensitive_string::SensitiveString;

use crate::sinks::util::http::{HttpRequest, HttpServiceRequestBuilder};
use crate::sinks::{
util::http::{HttpRequest, HttpServiceRequestBuilder},
HTTPRequestBuilderSnafu,
};
use snafu::ResultExt;

use super::config::HTTP_HEADER_HONEYCOMB;

Expand All @@ -15,11 +19,12 @@ pub(super) struct HoneycombSvcRequestBuilder {
}

impl HttpServiceRequestBuilder<()> for HoneycombSvcRequestBuilder {
fn build(&self, mut request: HttpRequest<()>) -> Request<Bytes> {
fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
let builder = Request::post(&self.uri).header(HTTP_HEADER_HONEYCOMB, self.api_key.inner());

builder
.body(request.take_payload())
.expect("Failed to assign body to request- builder has errors")
.context(HTTPRequestBuilderSnafu)
.map_err(Into::into)
}
}
26 changes: 17 additions & 9 deletions src/sinks/http/service.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
//! Service implementation for the `http` sink.

use bytes::Bytes;
use http::{HeaderName, HeaderValue, Method, Request, Uri};
use http::{
header::{CONTENT_ENCODING, CONTENT_TYPE},
HeaderName, HeaderValue, Method, Request, Uri,
};
use indexmap::IndexMap;

use crate::{
http::Auth,
sinks::util::{
http::{HttpRequest, HttpServiceRequestBuilder},
UriSerde,
sinks::{
util::{
http::{HttpRequest, HttpServiceRequestBuilder},
UriSerde,
},
HTTPRequestBuilderSnafu,
},
};
use snafu::ResultExt;

use super::config::HttpMethod;

Expand Down Expand Up @@ -46,17 +53,17 @@ impl HttpSinkRequestBuilder {
}

impl HttpServiceRequestBuilder<()> for HttpSinkRequestBuilder {
fn build(&self, mut request: HttpRequest<()>) -> Request<Bytes> {
fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
let method: Method = self.method.into();
let uri: Uri = self.uri.uri.clone();
let mut builder = Request::builder().method(method).uri(uri);

if let Some(content_type) = &self.content_type {
builder = builder.header("Content-Type", content_type);
builder = builder.header(CONTENT_TYPE, content_type);
}

if let Some(content_encoding) = &self.content_encoding {
builder = builder.header("Content-Encoding", content_encoding);
builder = builder.header(CONTENT_ENCODING, content_encoding);
}

let headers = builder
Expand All @@ -71,12 +78,13 @@ impl HttpServiceRequestBuilder<()> for HttpSinkRequestBuilder {
// The request building should not have errors at this point
let mut request = builder
.body(request.take_payload())
.expect("Failed to assign body to request- builder has errors");
.context(HTTPRequestBuilderSnafu)
.map_err(Into::<crate::Error>::into)?;

if let Some(auth) = &self.auth {
auth.apply(&mut request);
}

request
Ok(request)
}
}
2 changes: 2 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub enum BuildError {
SocketAddressError { source: std::io::Error },
#[snafu(display("URI parse error: {}", source))]
UriParseError { source: ::http::uri::InvalidUri },
#[snafu(display("HTTP request build error: {}", source))]
HTTPRequestBuilderError { source: ::http::Error },
}

/// Common healthcheck errors
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/util/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ impl ItemBatchSize<Event> for HttpJsonBatchSizer {

/// HTTP request builder for HTTP stream sinks using the generic `HttpService`
pub trait HttpServiceRequestBuilder<T: Send> {
fn build(&self, request: HttpRequest<T>) -> Request<Bytes>;
fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
}

/// Generic 'Service' implementation for HTTP stream sinks.
Expand All @@ -730,7 +730,7 @@ where
let request_builder = Arc::clone(&http_request_builder);

let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
Box::pin(async move { Ok(request_builder.build(req)) });
Box::pin(async move { request_builder.build(req) });

fut
});
Expand Down

0 comments on commit a798f68

Please sign in to comment.