Skip to content

Commit

Permalink
feat: added kafka events for authentication create and update (#4991)
Browse files Browse the repository at this point in the history
Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
  • Loading branch information
vsrivatsa-juspay and hyperswitch-bot[bot] committed Jun 24, 2024
1 parent fed7b69 commit 10e9121
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 26 deletions.
23 changes: 12 additions & 11 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -587,17 +587,18 @@ enabled = true # Switch to enable or disable PayPal onboard
source = "logs" # The event sink to push events supports kafka or logs (stdout)

[events.kafka]
brokers = [] # Kafka broker urls for bootstrapping the client
intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events
attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events
refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events
brokers = [] # Kafka broker urls for bootstrapping the client
intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events
attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events
refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events
authentication_analytics_topic = "topic" # Kafka topic to be used for Authentication events

# File storage configuration
[file_storage]
Expand Down
23 changes: 12 additions & 11 deletions config/deployments/env_specific.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,18 @@ sts_role_session_name = "" # An identifier for the assumed role session, used to
source = "logs" # The event sink to push events supports kafka or logs (stdout)

[events.kafka]
brokers = [] # Kafka broker urls for bootstrapping the client
intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events
attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events
refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events
brokers = [] # Kafka broker urls for bootstrapping the client
intent_analytics_topic = "topic" # Kafka topic to be used for PaymentIntent events
attempt_analytics_topic = "topic" # Kafka topic to be used for PaymentAttempt events
refund_analytics_topic = "topic" # Kafka topic to be used for Refund events
api_logs_topic = "topic" # Kafka topic to be used for incoming api events
connector_logs_topic = "topic" # Kafka topic to be used for connector api events
outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events
dispute_analytics_topic = "topic" # Kafka topic to be used for Dispute events
audit_events_topic = "topic" # Kafka topic to be used for Payment Audit events
payout_analytics_topic = "topic" # Kafka topic to be used for Payouts and PayoutAttempt events
consolidated_events_topic = "topic" # Kafka topic to be used for Consolidated events
authentication_analytics_topic = "topic" # Kafka topic to be used for Authentication events

# File storage configuration
[file_storage]
Expand Down
1 change: 1 addition & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ dispute_analytics_topic = "hyperswitch-dispute-events"
audit_events_topic = "hyperswitch-audit-events"
payout_analytics_topic = "hyperswitch-payout-events"
consolidated_events_topic = "hyperswitch-consolidated-events"
authentication_analytics_topic = "hyperswitch-authentication-events"

[analytics]
source = "sqlx"
Expand Down
1 change: 1 addition & 0 deletions config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ dispute_analytics_topic = "hyperswitch-dispute-events"
audit_events_topic = "hyperswitch-audit-events"
payout_analytics_topic = "hyperswitch-payout-events"
consolidated_events_topic = "hyperswitch-consolidated-events"
authentication_analytics_topic = "hyperswitch-authentication-events"

[analytics]
source = "sqlx"
Expand Down
194 changes: 194 additions & 0 deletions crates/analytics/docs/clickhouse/scripts/authentications.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
CREATE TABLE authentication_queue (
`authentication_id` String,
`merchant_id` String,
`authentication_connector` LowCardinality(String),
`connector_authentication_id` Nullable(String),
`authentication_data` Nullable(String),
`payment_method_id` Nullable(String),
`authentication_type` LowCardinality(Nullable(String)),
`authentication_status` LowCardinality(String),
`authentication_lifecycle_status` LowCardinality(String),
`created_at` DateTime64(3),
`modified_at` DateTime64(3),
`error_message` Nullable(String),
`error_code` Nullable(String),
`connector_metadata` Nullable(String),
`maximum_supported_version` LowCardinality(Nullable(String)),
`threeds_server_transaction_id` Nullable(String),
`cavv` Nullable(String),
`authentication_flow_type` Nullable(String),
`message_version` LowCardinality(Nullable(String)),
`eci` Nullable(String),
`trans_status` LowCardinality(Nullable(String)),
`acquirer_bin` Nullable(String),
`acquirer_merchant_id` Nullable(String),
`three_ds_method_data` Nullable(String),
`three_ds_method_url` Nullable(String),
`acs_url` Nullable(String),
`challenge_request` Nullable(String),
`acs_reference_number` Nullable(String),
`acs_trans_id` Nullable(String),
`acs_signed_content` Nullable(String),
`profile_id` String,
`payment_id` Nullable(String),
`merchant_connector_id` Nullable(String),
`ds_trans_id` Nullable(String),
`directory_server_id` Nullable(String),
`acquirer_country_code` Nullable(String),
`sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-authentication-events',
kafka_group_name = 'hyper',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode = 'stream';

CREATE TABLE authentications (
`authentication_id` String,
`merchant_id` String,
`authentication_connector` LowCardinality(String),
`connector_authentication_id` Nullable(String),
`authentication_data` Nullable(String),
`payment_method_id` Nullable(String),
`authentication_type` LowCardinality(Nullable(String)),
`authentication_status` LowCardinality(String),
`authentication_lifecycle_status` LowCardinality(String),
`created_at` DateTime64(3) DEFAULT now64(),
`inserted_at` DateTime64(3) DEFAULT now64(),
`modified_at` DateTime64(3) DEFAULT now64(),
`error_message` Nullable(String),
`error_code` Nullable(String),
`connector_metadata` Nullable(String),
`maximum_supported_version` LowCardinality(Nullable(String)),
`threeds_server_transaction_id` Nullable(String),
`cavv` Nullable(String),
`authentication_flow_type` Nullable(String),
`message_version` LowCardinality(Nullable(String)),
`eci` Nullable(String),
`trans_status` LowCardinality(Nullable(String)),
`acquirer_bin` Nullable(String),
`acquirer_merchant_id` Nullable(String),
`three_ds_method_data` Nullable(String),
`three_ds_method_url` Nullable(String),
`acs_url` Nullable(String),
`challenge_request` Nullable(String),
`acs_reference_number` Nullable(String),
`acs_trans_id` Nullable(String),
`acs_signed_content` Nullable(String),
`profile_id` String,
`payment_id` Nullable(String),
`merchant_connector_id` Nullable(String),
`ds_trans_id` Nullable(String),
`directory_server_id` Nullable(String),
`acquirer_country_code` Nullable(String),
`sign_flag` Int8,
INDEX authenticationConnectorIndex authentication_connector TYPE bloom_filter GRANULARITY 1,
INDEX transStatusIndex trans_status TYPE bloom_filter GRANULARITY 1,
INDEX authenticationTypeIndex authentication_type TYPE bloom_filter GRANULARITY 1,
INDEX authenticationStatusIndex authentication_status TYPE bloom_filter GRANULARITY 1
) ENGINE = CollapsingMergeTree(sign_flag) PARTITION BY toStartOfDay(created_at)
ORDER BY
(created_at, merchant_id, authentication_id) TTL toStartOfDay(created_at) + toIntervalMonth(18) SETTINGS index_granularity = 8192;

CREATE MATERIALIZED VIEW authentication_mv TO authentications (
`authentication_id` String,
`merchant_id` String,
`authentication_connector` LowCardinality(String),
`connector_authentication_id` Nullable(String),
`authentication_data` Nullable(String),
`payment_method_id` Nullable(String),
`authentication_type` LowCardinality(Nullable(String)),
`authentication_status` LowCardinality(String),
`authentication_lifecycle_status` LowCardinality(String),
`created_at` DateTime64(3) DEFAULT now64(),
`inserted_at` DateTime64(3) DEFAULT now64(),
`modified_at` DateTime64(3) DEFAULT now64(),
`error_message` Nullable(String),
`error_code` Nullable(String),
`connector_metadata` Nullable(String),
`maximum_supported_version` LowCardinality(Nullable(String)),
`threeds_server_transaction_id` Nullable(String),
`cavv` Nullable(String),
`authentication_flow_type` Nullable(String),
`message_version` LowCardinality(Nullable(String)),
`eci` Nullable(String),
`trans_status` LowCardinality(Nullable(String)),
`acquirer_bin` Nullable(String),
`acquirer_merchant_id` Nullable(String),
`three_ds_method_data` Nullable(String),
`three_ds_method_url` Nullable(String),
`acs_url` Nullable(String),
`challenge_request` Nullable(String),
`acs_reference_number` Nullable(String),
`acs_trans_id` Nullable(String),
`acs_signed_content` Nullable(String),
`profile_id` String,
`payment_id` Nullable(String),
`merchant_connector_id` Nullable(String),
`ds_trans_id` Nullable(String),
`directory_server_id` Nullable(String),
`acquirer_country_code` Nullable(String),
`sign_flag` Int8
) AS
SELECT
authentication_id,
merchant_id,
authentication_connector,
connector_authentication_id,
authentication_data,
payment_method_id,
authentication_type,
authentication_status,
authentication_lifecycle_status,
created_at,
now64() as inserted_at,
modified_at,
error_message,
error_code,
connector_metadata,
maximum_supported_version,
threeds_server_transaction_id,
cavv,
authentication_flow_type,
message_version,
eci,
trans_status,
acquirer_bin,
acquirer_merchant_id,
three_ds_method_data,
three_ds_method_url,
acs_url,
challenge_request,
acs_reference_number,
acs_trans_id,
acs_signed_content,
profile_id,
payment_id,
merchant_connector_id,
ds_trans_id,
directory_server_id,
acquirer_country_code,
sign_flag
FROM
authentication_queue
WHERE
length(_error) = 0;

CREATE MATERIALIZED VIEW authentication_parse_errors (
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
) ENGINE = MergeTree
ORDER BY
(topic, partition, offset) SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM
authentication_queue
WHERE
length(_error) > 0;
28 changes: 25 additions & 3 deletions crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2751,9 +2751,20 @@ impl AuthenticationInterface for KafkaStore {
&self,
authentication: storage::AuthenticationNew,
) -> CustomResult<storage::Authentication, errors::StorageError> {
self.diesel_store
let auth = self
.diesel_store
.insert_authentication(authentication)
.await?;

if let Err(er) = self
.kafka_producer
.log_authentication(&auth, None, self.tenant_id.clone())
.await
{
logger::error!(message="Failed to log analytics event for authentication {auth:?}", error_message=?er)
}

Ok(auth)
}

async fn find_authentication_by_merchant_id_authentication_id(
Expand Down Expand Up @@ -2784,12 +2795,23 @@ impl AuthenticationInterface for KafkaStore {
previous_state: storage::Authentication,
authentication_update: storage::AuthenticationUpdate,
) -> CustomResult<storage::Authentication, errors::StorageError> {
self.diesel_store
let auth = self
.diesel_store
.update_authentication_by_merchant_id_authentication_id(
previous_state,
previous_state.clone(),
authentication_update,
)
.await?;

if let Err(er) = self
.kafka_producer
.log_authentication(&auth, Some(previous_state.clone()), self.tenant_id.clone())
.await
{
logger::error!(message="Failed to log analytics event for authentication {auth:?}", error_message=?er)
}

Ok(auth)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/router/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub enum EventType {
#[cfg(feature = "payouts")]
Payout,
Consolidated,
Authentication,
}

#[derive(Debug, Default, Deserialize, Clone)]
Expand Down
Loading

0 comments on commit 10e9121

Please sign in to comment.