Skip to content

Commit 2c15007

Browse files
committed
grpc service metrics
Signed-off-by: Eguzki Astiz Lezaun <[email protected]>
1 parent f5caadd commit 2c15007

File tree

5 files changed

+191
-13
lines changed

5 files changed

+191
-13
lines changed

src/filter.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ extern "C" fn start() {
3333
Box::new(FilterRoot {
3434
context_id,
3535
config: Default::default(),
36+
rate_limit_ok_metric_id: 0,
37+
rate_limit_error_metric_id: 0,
38+
rate_limit_over_limit_metric_id: 0,
39+
rate_limit_failure_mode_allowed_metric_id: 0,
40+
auth_ok_metric_id: 0,
41+
auth_error_metric_id: 0,
42+
auth_denied_metric_id: 0,
43+
auth_failure_mode_allowed_metric_id: 0,
3644
})
3745
});
3846
}

src/filter/http_context.rs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::configuration::action_set::ActionSet;
22
use crate::configuration::{FailureMode, FilterConfig};
3+
use crate::envoy::StatusCode;
34
use crate::operation_dispatcher::OperationDispatcher;
45
use crate::service::GrpcService;
56
use log::{debug, warn};
@@ -58,9 +59,21 @@ impl Filter {
5859
}
5960
Err(e) => {
6061
warn!("gRPC call failed! {e:?}");
61-
if let FailureMode::Deny = operation.get_failure_mode() {
62-
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
63-
}
62+
match operation.get_failure_mode() {
63+
FailureMode::Deny => {
64+
operation
65+
.get_service_handler()
66+
.service_metrics
67+
.report_error();
68+
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
69+
}
70+
FailureMode::Allow => {
71+
operation
72+
.get_service_handler()
73+
.service_metrics
74+
.report_allowed_on_failure();
75+
}
76+
};
6477
Action::Continue
6578
}
6679
}
@@ -113,13 +126,37 @@ impl Context for Filter {
113126
let some_op = self.operation_dispatcher.borrow().get_operation(token_id);
114127

115128
if let Some(operation) = some_op {
116-
if GrpcService::process_grpc_response(operation, resp_size).is_ok() {
117-
self.operation_dispatcher.borrow_mut().next();
118-
if let Some(_op) = self.operation_dispatcher.borrow_mut().next() {
119-
} else {
120-
self.resume_http_request()
129+
match GrpcService::process_grpc_response(Rc::clone(&operation), resp_size) {
130+
Ok(_) => {
131+
operation.get_service_handler().service_metrics.report_ok();
132+
self.operation_dispatcher.borrow_mut().next();
133+
if self.operation_dispatcher.borrow_mut().next().is_none() {
134+
self.resume_http_request()
135+
}
121136
}
122-
}
137+
Err(
138+
StatusCode::TooManyRequests | StatusCode::Unauthorized | StatusCode::Forbidden,
139+
) => {
140+
operation
141+
.get_service_handler()
142+
.service_metrics
143+
.report_rejected();
144+
}
145+
Err(_) => match operation.get_failure_mode() {
146+
FailureMode::Deny => {
147+
operation
148+
.get_service_handler()
149+
.service_metrics
150+
.report_error();
151+
}
152+
FailureMode::Allow => {
153+
operation
154+
.get_service_handler()
155+
.service_metrics
156+
.report_allowed_on_failure();
157+
}
158+
},
159+
};
123160
} else {
124161
warn!("No Operation found with token_id: {token_id}");
125162
GrpcService::handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode

src/filter/root_context.rs

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use crate::configuration::{FilterConfig, PluginConfiguration};
1+
use crate::configuration::{FilterConfig, PluginConfiguration, ServiceType};
22
use crate::filter::http_context::Filter;
33
use crate::operation_dispatcher::OperationDispatcher;
4-
use crate::service::{GrpcServiceHandler, HeaderResolver};
4+
use crate::service::{GrpcServiceHandler, HeaderResolver, ServiceMetrics};
55
use const_format::formatcp;
66
use log::{debug, error, info};
7+
use proxy_wasm::hostcalls;
78
use proxy_wasm::traits::{Context, HttpContext, RootContext};
8-
use proxy_wasm::types::ContextType;
9+
use proxy_wasm::types::{ContextType, MetricType};
910
use std::collections::HashMap;
1011
use std::rc::Rc;
1112

@@ -18,6 +19,14 @@ const WASM_SHIM_HEADER: &str = "Kuadrant wasm module";
1819
pub struct FilterRoot {
1920
pub context_id: u32,
2021
pub config: Rc<FilterConfig>,
22+
pub rate_limit_ok_metric_id: u32,
23+
pub rate_limit_error_metric_id: u32,
24+
pub rate_limit_over_limit_metric_id: u32,
25+
pub rate_limit_failure_mode_allowed_metric_id: u32,
26+
pub auth_ok_metric_id: u32,
27+
pub auth_error_metric_id: u32,
28+
pub auth_denied_metric_id: u32,
29+
pub auth_failure_mode_allowed_metric_id: u32,
2130
}
2231

2332
impl RootContext for FilterRoot {
@@ -30,6 +39,51 @@ impl RootContext for FilterRoot {
3039
"#{} {} {}: VM started",
3140
self.context_id, WASM_SHIM_HEADER, full_version
3241
);
42+
43+
self.rate_limit_ok_metric_id =
44+
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.ok") {
45+
Ok(metric_id) => metric_id,
46+
Err(e) => panic!("Error: {:?}", e),
47+
};
48+
self.rate_limit_error_metric_id =
49+
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.error") {
50+
Ok(metric_id) => metric_id,
51+
Err(e) => panic!("Error: {:?}", e),
52+
};
53+
self.rate_limit_over_limit_metric_id =
54+
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.over_limit") {
55+
Ok(metric_id) => metric_id,
56+
Err(e) => panic!("Error: {:?}", e),
57+
};
58+
self.rate_limit_failure_mode_allowed_metric_id = match hostcalls::define_metric(
59+
MetricType::Counter,
60+
"kuadrant.rate_limit.failure_mode_allowed",
61+
) {
62+
Ok(metric_id) => metric_id,
63+
Err(e) => panic!("Error: {:?}", e),
64+
};
65+
self.auth_ok_metric_id =
66+
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.ok") {
67+
Ok(metric_id) => metric_id,
68+
Err(e) => panic!("Error: {:?}", e),
69+
};
70+
self.auth_error_metric_id =
71+
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.error") {
72+
Ok(metric_id) => metric_id,
73+
Err(e) => panic!("Error: {:?}", e),
74+
};
75+
self.auth_denied_metric_id =
76+
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.denied") {
77+
Ok(metric_id) => metric_id,
78+
Err(e) => panic!("Error: {:?}", e),
79+
};
80+
self.auth_failure_mode_allowed_metric_id = match hostcalls::define_metric(
81+
MetricType::Counter,
82+
"kuadrant.auth.failure_mode_allowed",
83+
) {
84+
Ok(metric_id) => metric_id,
85+
Err(e) => panic!("Error: {:?}", e),
86+
};
3387
true
3488
}
3589

@@ -46,6 +100,7 @@ impl RootContext for FilterRoot {
46100
.or_insert(Rc::from(GrpcServiceHandler::new(
47101
Rc::clone(grpc_service),
48102
Rc::clone(&header_resolver),
103+
Rc::new(self.service_metrics(grpc_service.get_service_type())),
49104
)));
50105
});
51106
Some(Box::new(Filter {
@@ -89,3 +144,22 @@ impl RootContext for FilterRoot {
89144
}
90145

91146
impl Context for FilterRoot {}
147+
148+
impl FilterRoot {
149+
fn service_metrics(&self, service_type: &ServiceType) -> ServiceMetrics {
150+
match service_type {
151+
ServiceType::Auth => ServiceMetrics::new(
152+
self.auth_ok_metric_id,
153+
self.auth_error_metric_id,
154+
self.auth_denied_metric_id,
155+
self.auth_failure_mode_allowed_metric_id,
156+
),
157+
ServiceType::RateLimit => ServiceMetrics::new(
158+
self.rate_limit_ok_metric_id,
159+
self.rate_limit_error_metric_id,
160+
self.rate_limit_over_limit_metric_id,
161+
self.rate_limit_failure_mode_allowed_metric_id,
162+
),
163+
}
164+
}
165+
}

src/operation_dispatcher.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ impl Operation {
107107
pub fn get_failure_mode(&self) -> &FailureMode {
108108
&self.service.failure_mode
109109
}
110+
111+
pub fn get_service_handler(&self) -> &GrpcServiceHandler {
112+
&self.service_handler
113+
}
110114
}
111115

112116
pub struct OperationDispatcher {

src/service.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ impl GrpcService {
9090
FailureMode::Allow => hostcalls::resume_http_request().unwrap(),
9191
}
9292
}
93+
94+
pub fn get_service_type(&self) -> &ServiceType {
95+
&self.service.service_type
96+
}
9397
}
9498

9599
pub type GrpcCallFn = fn(
@@ -109,13 +113,19 @@ pub type GrpcMessageBuildFn =
109113
pub struct GrpcServiceHandler {
110114
grpc_service: Rc<GrpcService>,
111115
header_resolver: Rc<HeaderResolver>,
116+
pub service_metrics: Rc<ServiceMetrics>,
112117
}
113118

114119
impl GrpcServiceHandler {
115-
pub fn new(grpc_service: Rc<GrpcService>, header_resolver: Rc<HeaderResolver>) -> Self {
120+
pub fn new(
121+
grpc_service: Rc<GrpcService>,
122+
header_resolver: Rc<HeaderResolver>,
123+
service_metrics: Rc<ServiceMetrics>,
124+
) -> Self {
116125
Self {
117126
grpc_service,
118127
header_resolver,
128+
service_metrics,
119129
}
120130
}
121131

@@ -201,3 +211,48 @@ impl TracingHeader {
201211
}
202212
}
203213
}
214+
215+
pub struct ServiceMetrics {
216+
ok_metric_id: u32,
217+
error_metric_id: u32,
218+
rejected_metric_id: u32,
219+
failure_mode_allowed_metric_id: u32,
220+
}
221+
222+
impl ServiceMetrics {
223+
pub fn new(
224+
ok_metric_id: u32,
225+
error_metric_id: u32,
226+
rejected_metric_id: u32,
227+
failure_mode_allowed_metric_id: u32,
228+
) -> Self {
229+
Self {
230+
ok_metric_id,
231+
error_metric_id,
232+
rejected_metric_id,
233+
failure_mode_allowed_metric_id,
234+
}
235+
}
236+
237+
fn report(metric_id: u32, offset: i64) {
238+
if let Err(e) = hostcalls::increment_metric(metric_id, offset) {
239+
warn!("report metric {metric_id}, error: {e:?}");
240+
}
241+
}
242+
243+
pub fn report_error(&self) {
244+
Self::report(self.error_metric_id, 1);
245+
}
246+
247+
pub fn report_allowed_on_failure(&self) {
248+
Self::report(self.failure_mode_allowed_metric_id, 1);
249+
}
250+
251+
pub fn report_ok(&self) {
252+
Self::report(self.ok_metric_id, 1);
253+
}
254+
255+
pub fn report_rejected(&self) {
256+
Self::report(self.rejected_metric_id, 1);
257+
}
258+
}

0 commit comments

Comments
 (0)