Skip to content

Commit

Permalink
refactor(events): populate object identifiers in outgoing webhooks an…
Browse files Browse the repository at this point in the history
…alytics events during retries (#5067)
  • Loading branch information
SanchithHegde committed Jun 24, 2024
1 parent 9caabef commit b878405
Show file tree
Hide file tree
Showing 13 changed files with 177 additions and 104 deletions.
40 changes: 40 additions & 0 deletions crates/common_utils/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,43 @@ macro_rules! collect_missing_value_keys {
}
};
}

#[macro_export]
macro_rules! impl_to_sql_from_sql_json {
($type:ty, $diesel_type:ty) => {
#[allow(unused_qualifications)]
impl diesel::serialize::ToSql<$diesel_type, diesel::pg::Pg> for $type {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, diesel::pg::Pg>,
) -> diesel::serialize::Result {
let value = serde_json::to_value(self)?;

// the function `reborrow` only works in case of `Pg` backend. But, in case of other backends
// please refer to the diesel migration blog:
// https://github.com/Diesel-rs/Diesel/blob/master/guide_drafts/migration_guide.md#changed-tosql-implementations
<serde_json::Value as diesel::serialize::ToSql<
$diesel_type,
diesel::pg::Pg,
>>::to_sql(&value, &mut out.reborrow())
}
}

#[allow(unused_qualifications)]
impl diesel::deserialize::FromSql<$diesel_type, diesel::pg::Pg> for $type {
fn from_sql(
bytes: <diesel::pg::Pg as diesel::backend::Backend>::RawValue<'_>,
) -> diesel::deserialize::Result<Self> {
let value = <serde_json::Value as diesel::deserialize::FromSql<
$diesel_type,
diesel::pg::Pg,
>>::from_sql(bytes)?;
Ok(serde_json::from_value(value)?)
}
}
};
($type: ty) => {
$crate::impl_to_sql_from_sql_json!($type, diesel::sql_types::Json);
$crate::impl_to_sql_from_sql_json!($type, diesel::sql_types::Jsonb);
};
}
48 changes: 2 additions & 46 deletions crates/common_utils/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,29 +207,7 @@ impl FromStr for SemanticVersion {
}
}

impl<DB: Backend> FromSql<Jsonb, DB> for SemanticVersion
where
serde_json::Value: FromSql<Jsonb, DB>,
{
fn from_sql(bytes: DB::RawValue<'_>) -> deserialize::Result<Self> {
let value = <serde_json::Value as FromSql<Jsonb, DB>>::from_sql(bytes)?;
Ok(serde_json::from_value(value)?)
}
}

impl ToSql<Jsonb, diesel::pg::Pg> for SemanticVersion
where
serde_json::Value: ToSql<Jsonb, diesel::pg::Pg>,
{
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, diesel::pg::Pg>) -> diesel::serialize::Result {
let value = serde_json::to_value(self)?;

// the function `reborrow` only works in case of `Pg` backend. But, in case of other backends
// please refer to the diesel migration blog:
// https://github.com/Diesel-rs/Diesel/blob/master/guide_drafts/migration_guide.md#changed-tosql-implementations
<serde_json::Value as ToSql<Jsonb, diesel::pg::Pg>>::to_sql(&value, &mut out.reborrow())
}
}
crate::impl_to_sql_from_sql_json!(SemanticVersion);

/// Amount convertor trait for connector
pub trait AmountConvertor: Send {
Expand Down Expand Up @@ -692,26 +670,4 @@ pub struct ChargeRefunds {
pub revert_transfer: Option<bool>,
}

impl<DB: Backend> FromSql<Jsonb, DB> for ChargeRefunds
where
serde_json::Value: FromSql<Jsonb, DB>,
{
fn from_sql(bytes: DB::RawValue<'_>) -> deserialize::Result<Self> {
let value = <serde_json::Value as FromSql<Jsonb, DB>>::from_sql(bytes)?;
Ok(serde_json::from_value(value)?)
}
}

impl ToSql<Jsonb, diesel::pg::Pg> for ChargeRefunds
where
serde_json::Value: ToSql<Jsonb, diesel::pg::Pg>,
{
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, diesel::pg::Pg>) -> diesel::serialize::Result {
let value = serde_json::to_value(self)?;

// the function `reborrow` only works in case of `Pg` backend. But, in case of other backends
// please refer to the diesel migration blog:
// https://github.com/Diesel-rs/Diesel/blob/master/guide_drafts/migration_guide.md#changed-tosql-implementations
<serde_json::Value as ToSql<Jsonb, diesel::pg::Pg>>::to_sql(&value, &mut out.reborrow())
}
}
crate::impl_to_sql_from_sql_json!(ChargeRefunds);
54 changes: 3 additions & 51 deletions crates/diesel_models/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod diesel_exports {
}
pub use common_enums::*;
use common_utils::pii;
use diesel::serialize::{Output, ToSql};
use diesel::{deserialize::FromSqlRow, expression::AsExpression, sql_types::Jsonb};
use router_derive::diesel_enum;
use time::PrimitiveDateTime;

Expand Down Expand Up @@ -142,12 +142,6 @@ pub enum MandateType {
#[default]
MultiUse,
}
use diesel::{
backend::Backend,
deserialize::{FromSql, FromSqlRow},
expression::AsExpression,
sql_types::Jsonb,
};

#[derive(
serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, FromSqlRow, AsExpression,
Expand All @@ -157,29 +151,9 @@ use diesel::{
pub struct MandateDetails {
pub update_mandate_id: Option<String>,
}
impl<DB: Backend> FromSql<Jsonb, DB> for MandateDetails
where
serde_json::Value: FromSql<Jsonb, DB>,
{
fn from_sql(bytes: DB::RawValue<'_>) -> diesel::deserialize::Result<Self> {
let value = <serde_json::Value as FromSql<Jsonb, DB>>::from_sql(bytes)?;
Ok(serde_json::from_value(value)?)
}
}

impl ToSql<Jsonb, diesel::pg::Pg> for MandateDetails
where
serde_json::Value: ToSql<Jsonb, diesel::pg::Pg>,
{
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, diesel::pg::Pg>) -> diesel::serialize::Result {
let value = serde_json::to_value(self)?;
common_utils::impl_to_sql_from_sql_json!(MandateDetails);

// the function `reborrow` only works in case of `Pg` backend. But, in case of other backends
// please refer to the diesel migration blog:
// https://github.com/Diesel-rs/Diesel/blob/master/guide_drafts/migration_guide.md#changed-tosql-implementations
<serde_json::Value as ToSql<Jsonb, diesel::pg::Pg>>::to_sql(&value, &mut out.reborrow())
}
}
#[derive(
serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, FromSqlRow, AsExpression,
)]
Expand All @@ -190,29 +164,7 @@ pub enum MandateDataType {
MultiUse(Option<MandateAmountData>),
}

impl<DB: Backend> FromSql<Jsonb, DB> for MandateDataType
where
serde_json::Value: FromSql<Jsonb, DB>,
{
fn from_sql(bytes: DB::RawValue<'_>) -> diesel::deserialize::Result<Self> {
let value = <serde_json::Value as FromSql<Jsonb, DB>>::from_sql(bytes)?;
Ok(serde_json::from_value(value)?)
}
}

impl ToSql<Jsonb, diesel::pg::Pg> for MandateDataType
where
serde_json::Value: ToSql<Jsonb, diesel::pg::Pg>,
{
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, diesel::pg::Pg>) -> diesel::serialize::Result {
let value = serde_json::to_value(self)?;

// the function `reborrow` only works in case of `Pg` backend. But, in case of other backends
// please refer to the diesel migration blog:
// https://github.com/Diesel-rs/Diesel/blob/master/guide_drafts/migration_guide.md#changed-tosql-implementations
<serde_json::Value as ToSql<Jsonb, diesel::pg::Pg>>::to_sql(&value, &mut out.reborrow())
}
}
common_utils::impl_to_sql_from_sql_json!(MandateDataType);

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct MandateAmountData {
Expand Down
33 changes: 32 additions & 1 deletion crates/diesel_models/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use common_utils::custom_serde;
use diesel::{AsChangeset, Identifiable, Insertable, Queryable};
use diesel::{
deserialize::FromSqlRow, expression::AsExpression, AsChangeset, Identifiable, Insertable,
Queryable,
};
use serde::{Deserialize, Serialize};
use time::PrimitiveDateTime;

Expand All @@ -23,6 +26,7 @@ pub struct EventNew {
pub request: Option<Encryption>,
pub response: Option<Encryption>,
pub delivery_attempt: Option<storage_enums::WebhookDeliveryAttempt>,
pub metadata: Option<EventMetadata>,
}

#[derive(Clone, Debug, Default, AsChangeset, router_derive::DebugAsDisplay)]
Expand Down Expand Up @@ -53,4 +57,31 @@ pub struct Event {
pub request: Option<Encryption>,
pub response: Option<Encryption>,
pub delivery_attempt: Option<storage_enums::WebhookDeliveryAttempt>,
pub metadata: Option<EventMetadata>,
}

#[derive(Clone, Debug, Deserialize, Serialize, AsExpression, FromSqlRow)]
#[diesel(sql_type = diesel::sql_types::Jsonb)]
pub enum EventMetadata {
Payment {
payment_id: String,
},
Payout {
payout_id: String,
},
Refund {
payment_id: String,
refund_id: String,
},
Dispute {
payment_id: String,
attempt_id: String,
dispute_id: String,
},
Mandate {
payment_method_id: String,
mandate_id: String,
},
}

common_utils::impl_to_sql_from_sql_json!(EventMetadata);
1 change: 1 addition & 0 deletions crates/diesel_models/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ diesel::table! {
request -> Nullable<Bytea>,
response -> Nullable<Bytea>,
delivery_attempt -> Nullable<WebhookDeliveryAttempt>,
metadata -> Nullable<Jsonb>,
}
}

Expand Down
85 changes: 83 additions & 2 deletions crates/router/src/core/webhooks/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ use crate::{
metrics,
},
db::StorageInterface,
events::outgoing_webhook_logs::{OutgoingWebhookEvent, OutgoingWebhookEventMetric},
events::outgoing_webhook_logs::{
OutgoingWebhookEvent, OutgoingWebhookEventContent, OutgoingWebhookEventMetric,
},
logger,
routes::{app::SessionStateInfo, SessionState},
services,
types::{
api,
domain::{self, types as domain_types},
storage::{self, enums},
transformers::ForeignFrom,
},
utils::{OptionExt, ValueExt},
workflows::outgoing_webhook_retry,
Expand Down Expand Up @@ -88,6 +91,8 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
.change_context(errors::ApiErrorResponse::WebhookProcessingFailure)
.attach_printable("Failed to construct outgoing webhook request content")?;

let event_metadata = storage::EventMetadata::foreign_from((&content, &primary_object_id));

let new_event = domain::Event {
event_id: event_id.clone(),
event_type,
Expand Down Expand Up @@ -116,6 +121,7 @@ pub(crate) async fn create_event_and_trigger_outgoing_webhook(
),
response: None,
delivery_attempt: Some(delivery_attempt),
metadata: Some(event_metadata),
};

let event_insert_result = state
Expand Down Expand Up @@ -440,7 +446,9 @@ fn raise_webhooks_analytics_event(

let outgoing_webhook_event_content = content
.as_ref()
.and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content);
.and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content)
.or_else(|| get_outgoing_webhook_event_content_from_event_metadata(event.metadata));

let webhook_event = OutgoingWebhookEvent::new(
merchant_id,
event.event_id,
Expand Down Expand Up @@ -810,3 +818,76 @@ async fn error_response_handler(

Err(error)
}

impl ForeignFrom<(&api::OutgoingWebhookContent, &str)> for storage::EventMetadata {
fn foreign_from((content, primary_object_id): (&api::OutgoingWebhookContent, &str)) -> Self {
match content {
webhooks::OutgoingWebhookContent::PaymentDetails(payments_response) => Self::Payment {
payment_id: payments_response
.payment_id
.clone()
.unwrap_or_else(|| primary_object_id.to_owned()),
},
webhooks::OutgoingWebhookContent::RefundDetails(refund_response) => Self::Refund {
payment_id: refund_response.payment_id.clone(),
refund_id: refund_response.refund_id.clone(),
},
webhooks::OutgoingWebhookContent::DisputeDetails(dispute_response) => Self::Dispute {
payment_id: dispute_response.payment_id.clone(),
attempt_id: dispute_response.attempt_id.clone(),
dispute_id: dispute_response.dispute_id.clone(),
},
webhooks::OutgoingWebhookContent::MandateDetails(mandate_response) => Self::Mandate {
payment_method_id: mandate_response.payment_method_id.clone(),
mandate_id: mandate_response.mandate_id.clone(),
},
#[cfg(feature = "payouts")]
webhooks::OutgoingWebhookContent::PayoutDetails(payout_response) => Self::Payout {
payout_id: payout_response.payout_id.clone(),
},
}
}
}

fn get_outgoing_webhook_event_content_from_event_metadata(
event_metadata: Option<storage::EventMetadata>,
) -> Option<OutgoingWebhookEventContent> {
event_metadata.map(|metadata| match metadata {
diesel_models::EventMetadata::Payment { payment_id } => {
OutgoingWebhookEventContent::Payment {
payment_id: Some(payment_id),
content: serde_json::Value::Null,
}
}
diesel_models::EventMetadata::Payout { payout_id } => OutgoingWebhookEventContent::Payout {
payout_id,
content: serde_json::Value::Null,
},
diesel_models::EventMetadata::Refund {
payment_id,
refund_id,
} => OutgoingWebhookEventContent::Refund {
payment_id,
refund_id,
content: serde_json::Value::Null,
},
diesel_models::EventMetadata::Dispute {
payment_id,
attempt_id,
dispute_id,
} => OutgoingWebhookEventContent::Dispute {
payment_id,
attempt_id,
dispute_id,
content: serde_json::Value::Null,
},
diesel_models::EventMetadata::Mandate {
payment_method_id,
mandate_id,
} => OutgoingWebhookEventContent::Mandate {
payment_method_id,
mandate_id,
content: serde_json::Value::Null,
},
})
}
1 change: 1 addition & 0 deletions crates/router/src/core/webhooks/webhook_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ pub async fn retry_delivery_attempt(
request: event_to_retry.request,
response: None,
delivery_attempt: Some(delivery_attempt),
metadata: event_to_retry.metadata,
};

let event = store
Expand Down
Loading

0 comments on commit b878405

Please sign in to comment.