Skip to content

Commit 08f0afb

Browse files
authored
feat(scheduler): add task segregation based on application source (#10505)
1 parent 38b9d68 commit 08f0afb

File tree

36 files changed

+191
-30
lines changed

36 files changed

+191
-30
lines changed

config/config.example.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22
# all the available configuration options, and is intended to be used
33
# solely as a reference. Please copy this file to create a config.
44

5+
# This is used to identify the running application instance
6+
# Check common_enums::ApplicationSource for available options
7+
# For more details - https://github.com/juspay/hyperswitch/issues/10489
8+
application_source = "main"
9+
510
# Server configuration
611
[server]
712
port = 8080
@@ -416,6 +421,7 @@ cards = [
416421
# It defines the streams/queues name and configuration as well as event selection variables
417422
[scheduler]
418423
stream = "SCHEDULER_STREAM"
424+
cug_stream = "CUG_SCHEDULER_STREAM"
419425
graceful_shutdown_interval = 60000 # Specifies how much time to wait while re-attempting shutdown for a service (in milliseconds)
420426
loop_interval = 5000 # Specifies how much time to wait before starting the defined behaviour of producer or consumer (in milliseconds)
421427

config/development.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
application_source = "main"
2+
13
[log.file]
24
enabled = false
35

@@ -382,6 +384,7 @@ zsl.base_url = "https://api.sitoffalb.net/"
382384

383385
[scheduler]
384386
stream = "SCHEDULER_STREAM"
387+
cug_stream = "CUG_SCHEDULER_STREAM"
385388

386389
[scheduler.consumer]
387390
disabled = false
@@ -1444,4 +1447,4 @@ token = "123456" # Superposition token
14441447
org_id = "localorg" # Organization ID in Superposition
14451448
workspace_id = "dev" # Workspace ID in Superposition
14461449
polling_interval = 15 # Polling interval in seconds for configuration updates
1447-
# request_timeout = # Request timeout in seconds for Superposition API calls (optional, default: none)
1450+
# request_timeout = # Request timeout in seconds for Superposition API calls (optional, default: none)

crates/common_enums/src/enums.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9966,6 +9966,28 @@ pub enum ProcessTrackerRunner {
99669966
InvoiceSyncflow,
99679967
}
99689968

9969+
#[derive(
9970+
Clone,
9971+
Copy,
9972+
Debug,
9973+
Default,
9974+
Eq,
9975+
PartialEq,
9976+
serde::Deserialize,
9977+
serde::Serialize,
9978+
strum::Display,
9979+
strum::EnumString,
9980+
ToSchema,
9981+
)]
9982+
#[router_derive::diesel_enum(storage_type = "text")]
9983+
#[serde(rename_all = "snake_case")]
9984+
#[strum(serialize_all = "snake_case")]
9985+
pub enum ApplicationSource {
9986+
#[default]
9987+
Main,
9988+
Cug,
9989+
}
9990+
99699991
#[derive(Debug)]
99709992
pub enum CryptoPadding {
99719993
PKCS7,

crates/diesel_models/src/process_tracker.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use error_stack::ResultExt;
55
use serde::{Deserialize, Serialize};
66
use time::PrimitiveDateTime;
77

8-
use crate::{enums as storage_enums, errors, schema::process_tracker, StorageResult};
8+
use crate::{
9+
enums as storage_enums, enums::ApplicationSource, errors, schema::process_tracker,
10+
StorageResult,
11+
};
912

1013
#[derive(
1114
Clone,
@@ -40,6 +43,7 @@ pub struct ProcessTracker {
4043
#[serde(with = "common_utils::custom_serde::iso8601")]
4144
pub updated_at: PrimitiveDateTime,
4245
pub version: ApiVersion,
46+
pub application_source: Option<ApplicationSource>,
4347
}
4448

4549
impl ProcessTracker {
@@ -66,6 +70,7 @@ pub struct ProcessTrackerNew {
6670
pub created_at: PrimitiveDateTime,
6771
pub updated_at: PrimitiveDateTime,
6872
pub version: ApiVersion,
73+
pub application_source: Option<ApplicationSource>,
6974
}
7075

7176
impl ProcessTrackerNew {
@@ -79,6 +84,7 @@ impl ProcessTrackerNew {
7984
retry_count: Option<i32>,
8085
schedule_time: PrimitiveDateTime,
8186
api_version: ApiVersion,
87+
application_source: ApplicationSource,
8288
) -> StorageResult<Self>
8389
where
8490
T: Serialize + std::fmt::Debug,
@@ -102,6 +108,7 @@ impl ProcessTrackerNew {
102108
created_at: current_time,
103109
updated_at: current_time,
104110
version: api_version,
111+
application_source: Some(application_source),
105112
})
106113
}
107114
}

crates/diesel_models/src/schema.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,6 +1426,8 @@ diesel::table! {
14261426
created_at -> Timestamp,
14271427
updated_at -> Timestamp,
14281428
version -> ApiVersion,
1429+
#[max_length = 64]
1430+
application_source -> Nullable<Varchar>,
14291431
}
14301432
}
14311433

crates/diesel_models/src/schema_v2.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,6 +1371,8 @@ diesel::table! {
13711371
created_at -> Timestamp,
13721372
updated_at -> Timestamp,
13731373
version -> ApiVersion,
1374+
#[max_length = 64]
1375+
application_source -> Nullable<Varchar>,
13741376
}
13751377
}
13761378

crates/router/src/configs/secrets_transformers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,7 @@ pub(crate) async fn fetch_raw_secrets(
489489

490490
Settings {
491491
server: conf.server,
492+
application_source: conf.application_source,
492493
chat,
493494
master_database,
494495
redis: conf.redis,

crates/router/src/configs/settings.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
#[cfg(feature = "olap")]
88
use analytics::{opensearch::OpenSearchConfig, ReportConfig};
99
use api_models::enums;
10+
use common_enums;
1011
use common_utils::{ext_traits::ConfigExt, id_type, types::user::EmailThemeConfig};
1112
use config::{Environment, File};
1213
use error_stack::ResultExt;
@@ -73,6 +74,7 @@ pub struct CmdLineConf {
7374
#[serde(default)]
7475
pub struct Settings<S: SecretState> {
7576
pub server: Server,
77+
pub application_source: common_enums::ApplicationSource,
7678
pub proxy: Proxy,
7779
pub env: Env,
7880
pub chat: SecretStateContainer<ChatSettings, S>,

crates/router/src/core/api_keys.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,15 @@ pub async fn create_api_key(
171171
if api_key.expires_at.is_some() {
172172
let expiry_reminder_days = state.conf.api_keys.get_inner().expiry_reminder_days.clone();
173173

174-
add_api_key_expiry_task(store, &api_key, expiry_reminder_days)
175-
.await
176-
.change_context(errors::ApiErrorResponse::InternalServerError)
177-
.attach_printable("Failed to insert API key expiry reminder to process tracker")?;
174+
add_api_key_expiry_task(
175+
store,
176+
&api_key,
177+
expiry_reminder_days,
178+
state.conf.application_source,
179+
)
180+
.await
181+
.change_context(errors::ApiErrorResponse::InternalServerError)
182+
.attach_printable("Failed to insert API key expiry reminder to process tracker")?;
178183
}
179184
}
180185

@@ -193,6 +198,7 @@ pub async fn add_api_key_expiry_task(
193198
store: &dyn crate::db::StorageInterface,
194199
api_key: &ApiKey,
195200
expiry_reminder_days: Vec<u8>,
201+
application_source: common_enums::ApplicationSource,
196202
) -> Result<(), errors::ProcessTrackerError> {
197203
let current_time = date_time::now();
198204

@@ -231,6 +237,7 @@ pub async fn add_api_key_expiry_task(
231237
None,
232238
schedule_time,
233239
common_types::consts::API_VERSION,
240+
application_source,
234241
)
235242
.change_context(errors::ApiErrorResponse::InternalServerError)
236243
.attach_printable("Failed to construct API key expiry process tracker task")?;
@@ -346,12 +353,15 @@ pub async fn update_api_key(
346353
else if api_key.expires_at.is_some() {
347354
// Process doesn't exist in process_tracker table, so create new entry with
348355
// schedule_time based on new expiry set.
349-
add_api_key_expiry_task(store, &api_key, expiry_reminder_days)
350-
.await
351-
.change_context(errors::ApiErrorResponse::InternalServerError)
352-
.attach_printable(
353-
"Failed to insert API key expiry reminder task to process tracker",
354-
)?;
356+
add_api_key_expiry_task(
357+
store,
358+
&api_key,
359+
expiry_reminder_days,
360+
state.conf.application_source,
361+
)
362+
.await
363+
.change_context(errors::ApiErrorResponse::InternalServerError)
364+
.attach_printable("Failed to insert API key expiry reminder task to process tracker")?;
355365
}
356366
}
357367

crates/router/src/core/disputes.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use router_env::{
1212
use strum::IntoEnumIterator;
1313
pub mod transformers;
1414

15+
use common_enums;
16+
1517
use super::{
1618
errors::{self, ConnectorErrorExt, RouterResponse, StorageErrorExt},
1719
metrics,
@@ -843,6 +845,7 @@ pub async fn fetch_disputes_from_connector(
843845
dispute,
844846
merchant_id.clone(),
845847
schedule_time,
848+
state.conf.application_source,
846849
)
847850
.await;
848851

@@ -917,6 +920,7 @@ pub async fn add_process_dispute_task_to_pt(
917920
dispute_payload: &DisputeSyncResponse,
918921
merchant_id: common_utils::id_type::MerchantId,
919922
schedule_time: Option<time::PrimitiveDateTime>,
923+
application_source: common_enums::ApplicationSource,
920924
) -> common_utils::errors::CustomResult<(), errors::StorageError> {
921925
match schedule_time {
922926
Some(time) => {
@@ -947,6 +951,7 @@ pub async fn add_process_dispute_task_to_pt(
947951
None,
948952
time,
949953
common_types::consts::API_VERSION,
954+
application_source,
950955
)
951956
.map_err(errors::StorageError::from)?;
952957
db.insert_process(process_tracker_entry).await?;
@@ -964,6 +969,7 @@ pub async fn add_dispute_list_task_to_pt(
964969
merchant_connector_id: common_utils::id_type::MerchantConnectorAccountId,
965970
profile_id: common_utils::id_type::ProfileId,
966971
fetch_request: FetchDisputesRequestData,
972+
application_source: common_enums::ApplicationSource,
967973
) -> common_utils::errors::CustomResult<(), errors::StorageError> {
968974
TASKS_ADDED_COUNT.add(1, router_env::metric_attributes!(("flow", "dispute_list")));
969975
let tracking_data = disputes::DisputeListPTData {
@@ -992,6 +998,7 @@ pub async fn add_dispute_list_task_to_pt(
992998
None,
993999
fetch_request.created_from,
9941000
common_types::consts::API_VERSION,
1001+
application_source,
9951002
)
9961003
.map_err(errors::StorageError::from)?;
9971004
db.insert_process(process_tracker_entry).await?;
@@ -1028,6 +1035,7 @@ pub async fn schedule_dispute_sync_task(
10281035
let merchant_id = mca.merchant_id.clone();
10291036
let merchant_connector_id = mca.merchant_connector_id.clone();
10301037
let business_profile_id = business_profile.get_id().clone();
1038+
let application_source = state.conf.application_source;
10311039

10321040
tokio::spawn(
10331041
async move {
@@ -1041,6 +1049,7 @@ pub async fn schedule_dispute_sync_task(
10411049
created_from,
10421050
created_till,
10431051
},
1052+
application_source,
10441053
)
10451054
.await
10461055
.map_err(|error| {

0 commit comments

Comments
 (0)