Skip to content

Commit afac8b1

Browse files
feat(framework): Implemented Custom HTTP Integration Layer (#329)
1 parent 03af1b4 commit afac8b1

File tree

15 files changed

+751
-12
lines changed

15 files changed

+751
-12
lines changed

backend/grpc-server/src/app.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@ use common_utils::consts;
33
use external_services::shared_metrics as metrics;
44
use grpc_api_types::{
55
health_check::health_server,
6-
payments::{
7-
dispute_service_handler, dispute_service_server, payment_service_handler,
8-
payment_service_server, refund_service_handler, refund_service_server,
9-
},
6+
payments::{dispute_service_server, payment_service_server, refund_service_server},
107
};
118
use std::{future::Future, net, sync::Arc};
129
use tokio::{
@@ -17,7 +14,8 @@ use tonic::transport::Server;
1714
use tower_http::{request_id::MakeRequestUuid, trace as tower_trace};
1815

1916
use crate::{
20-
config_overrides::RequestExtensionsLayer, configs, error::ConfigurationError, logger, utils,
17+
config_overrides::RequestExtensionsLayer, configs, error::ConfigurationError,
18+
http::config_middleware::HttpRequestExtensionsLayer, logger, utils,
2119
};
2220

2321
/// # Panics
@@ -84,7 +82,7 @@ pub async fn server_builder(config: configs::Config) -> Result<(), Configuration
8482
configs::ServiceType::Http => {
8583
service
8684
.await
87-
.http_server(socket_addr, shutdown_signal)
85+
.http_server(base_config, socket_addr, shutdown_signal)
8886
.await?
8987
}
9088
}
@@ -125,6 +123,7 @@ impl Service {
125123

126124
pub async fn http_server(
127125
self,
126+
base_config: Arc<configs::Config>,
128127
socket: net::SocketAddr,
129128
shutdown_signal: impl Future<Output = ()> + Send + 'static,
130129
) -> Result<(), ConfigurationError> {
@@ -151,14 +150,17 @@ impl Service {
151150
http::HeaderName::from_static(consts::X_REQUEST_ID),
152151
);
153152

154-
let router = axum::Router::new()
155-
.route("/health", axum::routing::get(|| async { "health is good" }))
156-
.merge(payment_service_handler(self.payments_service))
157-
.merge(refund_service_handler(self.refunds_service))
158-
.merge(dispute_service_handler(self.disputes_service))
153+
let config_override_layer = HttpRequestExtensionsLayer::new(base_config.clone());
154+
let app_state = crate::http::AppState::new(
155+
self.payments_service,
156+
self.refunds_service,
157+
self.disputes_service,
158+
);
159+
let router = crate::http::create_router(app_state)
159160
.layer(logging_layer)
160161
.layer(request_id_layer)
161-
.layer(propagate_request_id_layer);
162+
.layer(propagate_request_id_layer)
163+
.layer(config_override_layer);
162164

163165
let listener = tokio::net::TcpListener::bind(socket).await?;
164166

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use crate::{configs::Config, utils::merge_config_with_override};
2+
use axum::{body::Body, extract::Request, http::StatusCode, response::Response};
3+
use std::{
4+
future::Future,
5+
pin::Pin,
6+
sync::Arc,
7+
task::{Context, Poll},
8+
};
9+
use tower::{Layer, Service};
10+
11+
fn create_error_response(message: &str) -> Response<Body> {
12+
Response::builder()
13+
.status(StatusCode::INTERNAL_SERVER_ERROR)
14+
.body(Body::from(message.to_string()))
15+
.unwrap_or_else(|_| {
16+
// Single fallback - no nested unwrap needed
17+
let mut response = Response::new(Body::from("Internal server error"));
18+
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
19+
response
20+
})
21+
}
22+
23+
// HTTP middleware layer for adding config to request extensions
24+
#[derive(Clone)]
25+
pub struct HttpRequestExtensionsLayer {
26+
base_config: Arc<Config>,
27+
}
28+
29+
#[allow(clippy::new_without_default)]
30+
impl HttpRequestExtensionsLayer {
31+
pub fn new(base_config: Arc<Config>) -> Self {
32+
Self { base_config }
33+
}
34+
}
35+
36+
impl<S> Layer<S> for HttpRequestExtensionsLayer {
37+
type Service = HttpRequestExtensionsMiddleware<S>;
38+
39+
fn layer(&self, inner: S) -> Self::Service {
40+
HttpRequestExtensionsMiddleware {
41+
inner,
42+
base_config: self.base_config.clone(),
43+
}
44+
}
45+
}
46+
47+
#[derive(Clone)]
48+
pub struct HttpRequestExtensionsMiddleware<S> {
49+
inner: S,
50+
base_config: Arc<Config>,
51+
}
52+
53+
impl<S> Service<Request<Body>> for HttpRequestExtensionsMiddleware<S>
54+
where
55+
S: Service<Request<Body>, Response = Response, Error = std::convert::Infallible>
56+
+ Send
57+
+ 'static,
58+
S::Future: Send + 'static,
59+
{
60+
type Response = S::Response;
61+
type Error = S::Error;
62+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
63+
64+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
65+
self.inner.poll_ready(cx)
66+
}
67+
68+
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
69+
// Extract x-config-override header first
70+
let config_override = req
71+
.headers()
72+
.get("x-config-override")
73+
.and_then(|h| h.to_str().map(|s| s.to_owned()).ok());
74+
75+
// Only process config if override header is present
76+
match config_override {
77+
Some(override_str) => {
78+
// Merge override with default
79+
let new_config = match merge_config_with_override(
80+
Some(override_str),
81+
(*self.base_config).clone(),
82+
) {
83+
Ok(cfg) => cfg,
84+
Err(e) => {
85+
let error_response = create_error_response(&format!(
86+
"Failed to merge config with override config: {e:?}"
87+
));
88+
let fut = async move { Ok(error_response) };
89+
return Box::pin(fut);
90+
}
91+
};
92+
93+
// Insert merged config into extensions
94+
req.extensions_mut().insert(new_config);
95+
}
96+
None => {
97+
// No override header - insert base config
98+
req.extensions_mut().insert(Arc::clone(&self.base_config));
99+
}
100+
}
101+
102+
let future = self.inner.call(req);
103+
Box::pin(async move {
104+
let response = future.await?;
105+
Ok(response)
106+
})
107+
}
108+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use axum::{
2+
http::StatusCode,
3+
response::{IntoResponse, Response},
4+
Json,
5+
};
6+
use serde::Serialize;
7+
8+
#[derive(Debug)]
9+
pub struct HttpError {
10+
pub status: StatusCode,
11+
pub message: String,
12+
}
13+
14+
#[derive(Serialize)]
15+
struct ErrorResponse {
16+
error: ErrorDetail,
17+
}
18+
19+
#[derive(Serialize)]
20+
struct ErrorDetail {
21+
message: String,
22+
code: String,
23+
}
24+
25+
impl IntoResponse for HttpError {
26+
fn into_response(self) -> Response {
27+
let body = Json(ErrorResponse {
28+
error: ErrorDetail {
29+
message: self.message.clone(),
30+
code: format!("{}", self.status.as_u16()),
31+
},
32+
});
33+
(self.status, body).into_response()
34+
}
35+
}
36+
37+
// Convert tonic::Status to HTTP error
38+
impl From<tonic::Status> for HttpError {
39+
fn from(status: tonic::Status) -> Self {
40+
let http_status = match status.code() {
41+
tonic::Code::Ok => StatusCode::OK,
42+
tonic::Code::Cancelled => StatusCode::REQUEST_TIMEOUT,
43+
tonic::Code::Unknown => StatusCode::INTERNAL_SERVER_ERROR,
44+
tonic::Code::InvalidArgument => StatusCode::BAD_REQUEST,
45+
tonic::Code::DeadlineExceeded => StatusCode::GATEWAY_TIMEOUT,
46+
tonic::Code::NotFound => StatusCode::NOT_FOUND,
47+
tonic::Code::AlreadyExists => StatusCode::CONFLICT,
48+
tonic::Code::PermissionDenied => StatusCode::FORBIDDEN,
49+
tonic::Code::ResourceExhausted => StatusCode::TOO_MANY_REQUESTS,
50+
tonic::Code::FailedPrecondition => StatusCode::PRECONDITION_FAILED,
51+
tonic::Code::Aborted => StatusCode::CONFLICT,
52+
tonic::Code::OutOfRange => StatusCode::RANGE_NOT_SATISFIABLE,
53+
tonic::Code::Unimplemented => StatusCode::NOT_IMPLEMENTED,
54+
tonic::Code::Internal => StatusCode::INTERNAL_SERVER_ERROR,
55+
tonic::Code::Unavailable => StatusCode::SERVICE_UNAVAILABLE,
56+
tonic::Code::DataLoss => StatusCode::INTERNAL_SERVER_ERROR,
57+
tonic::Code::Unauthenticated => StatusCode::UNAUTHORIZED,
58+
};
59+
60+
Self {
61+
status: http_status,
62+
message: status.message().to_string(),
63+
}
64+
}
65+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use axum::{
2+
extract::{Extension, State},
3+
http::{HeaderMap, StatusCode},
4+
Json,
5+
};
6+
use grpc_api_types::payments::{
7+
dispute_service_server::DisputeService, AcceptDisputeRequest, AcceptDisputeResponse,
8+
DisputeDefendRequest, DisputeDefendResponse, DisputeResponse, DisputeServiceGetRequest,
9+
DisputeServiceSubmitEvidenceRequest, DisputeServiceSubmitEvidenceResponse,
10+
DisputeServiceTransformRequest, DisputeServiceTransformResponse,
11+
};
12+
use std::sync::Arc;
13+
14+
use crate::configs::Config;
15+
use crate::http::handlers::macros::http_handler;
16+
use crate::http::{
17+
error::HttpError, http_headers_to_grpc_metadata, state::AppState,
18+
transfer_config_to_grpc_request, utils::ValidatedJson,
19+
};
20+
21+
http_handler!(
22+
submit_evidence,
23+
DisputeServiceSubmitEvidenceRequest,
24+
DisputeServiceSubmitEvidenceResponse,
25+
submit_evidence,
26+
disputes_service
27+
);
28+
29+
http_handler!(
30+
get_dispute,
31+
DisputeServiceGetRequest,
32+
DisputeResponse,
33+
get,
34+
disputes_service
35+
);
36+
37+
http_handler!(
38+
defend_dispute,
39+
DisputeDefendRequest,
40+
DisputeDefendResponse,
41+
defend,
42+
disputes_service
43+
);
44+
45+
http_handler!(
46+
accept_dispute,
47+
AcceptDisputeRequest,
48+
AcceptDisputeResponse,
49+
accept,
50+
disputes_service
51+
);
52+
53+
http_handler!(
54+
transform_dispute,
55+
DisputeServiceTransformRequest,
56+
DisputeServiceTransformResponse,
57+
transform,
58+
disputes_service
59+
);
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use axum::{extract::State, http::StatusCode, Json};
2+
use serde_json::json;
3+
4+
use crate::http::state::AppState;
5+
6+
pub async fn health(State(_state): State<AppState>) -> Result<Json<serde_json::Value>, StatusCode> {
7+
Ok(Json(json!({
8+
"status": "healthy",
9+
"service": "connector-service"
10+
})))
11+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// All imports are used within the macro expansion
2+
#[allow(unused_imports)]
3+
use axum::{
4+
extract::{Extension, State},
5+
http::{HeaderMap, StatusCode},
6+
Json,
7+
};
8+
#[allow(unused_imports)]
9+
use tonic;
10+
11+
#[allow(unused_imports)]
12+
use crate::configs::Config;
13+
#[allow(unused_imports)]
14+
use crate::http::{
15+
error::HttpError, http_headers_to_grpc_metadata, state::AppState,
16+
transfer_config_to_grpc_request, utils::ValidatedJson,
17+
};
18+
#[allow(unused_imports)]
19+
use std::sync::Arc;
20+
21+
macro_rules! http_handler {
22+
($fn_name:ident, $req_type:ty, $resp_type:ty, $service_method:ident, $service_field:ident) => {
23+
pub async fn $fn_name(
24+
Extension(config): Extension<Arc<Config>>,
25+
State(state): State<AppState>,
26+
headers: HeaderMap,
27+
ValidatedJson(payload): ValidatedJson<$req_type>,
28+
) -> Result<Json<$resp_type>, HttpError> {
29+
let mut grpc_request = tonic::Request::new(payload);
30+
transfer_config_to_grpc_request(&config, &mut grpc_request);
31+
let grpc_metadata =
32+
http_headers_to_grpc_metadata(&headers).map_err(|status| HttpError {
33+
status: StatusCode::BAD_REQUEST,
34+
message: status.message().to_string(),
35+
})?;
36+
*grpc_request.metadata_mut() = grpc_metadata;
37+
let grpc_response = state.$service_field.$service_method(grpc_request).await?;
38+
Ok(Json(grpc_response.into_inner()))
39+
}
40+
};
41+
}
42+
43+
pub(crate) use http_handler;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
pub mod disputes;
2+
pub mod health;
3+
pub mod macros;
4+
pub mod payments;
5+
pub mod refunds;
6+
7+
// Re-export handler modules for easier imports
8+
pub use disputes::*;
9+
pub use health::*;
10+
pub use payments::*;
11+
pub use refunds::*;

0 commit comments

Comments
 (0)