@@ -20,7 +20,8 @@ use restate_types::vqueues::VQueueId;
2020
2121use crate :: metric_definitions:: {
2222 VQUEUE_GLOBAL_THROTTLE_WAIT_MS , VQUEUE_INVOKER_CONCURRENCY_WAIT_MS ,
23- VQUEUE_INVOKER_MEMORY_WAIT_MS , VQUEUE_LOCAL_THROTTLE_WAIT_MS ,
23+ VQUEUE_INVOKER_MEMORY_WAIT_MS , VQUEUE_LOCAL_THROTTLE_WAIT_MS , VQUEUE_LOCK_WAIT_MS ,
24+ VQUEUE_USER_LIMIT_WAIT_MS ,
2425} ;
2526use crate :: scheduler:: queue:: QueueItem ;
2627
@@ -92,6 +93,10 @@ struct Stats {
9293 blocked_on_invoker_memory_micros : u32 ,
9394 local_start_throttling_micros : u32 ,
9495 global_throttling_micros : u32 ,
96+ last_blocked_on_user_limit : Option < tokio:: time:: Instant > ,
97+ blocked_on_user_limit_micros : u32 ,
98+ last_blocked_on_lock : Option < tokio:: time:: Instant > ,
99+ blocked_on_lock_micros : u32 ,
95100}
96101impl Stats {
97102 fn reset ( & mut self ) {
@@ -101,19 +106,26 @@ impl Stats {
101106 self . blocked_on_invoker_memory_micros = 0 ;
102107 self . local_start_throttling_micros = 0 ;
103108 self . global_throttling_micros = 0 ;
109+ self . last_blocked_on_user_limit = None ;
110+ self . blocked_on_user_limit_micros = 0 ;
111+ self . last_blocked_on_lock = None ;
112+ self . blocked_on_lock_micros = 0 ;
104113 }
105114
106115 fn finalize ( & mut self ) -> WaitStats {
107- // ensures that the last capacity/memory- blocked segment is accounted for
116+ // ensures that the last blocked segments are accounted for
108117 self . record_invoker_memory_delay ( false ) ;
109- // ensures that the last capacity-blocked segment is accounted for
110118 self . record_invoker_concurrency_delay ( false ) ;
119+ self . record_user_limit_delay ( false ) ;
120+ self . record_lock_delay ( false ) ;
111121
112122 let stats = WaitStats {
113123 blocked_on_global_capacity_ms : self . blocked_on_invoker_concurrency_micros / 1000 ,
114124 vqueue_start_throttling_ms : self . local_start_throttling_micros / 1000 ,
115125 blocked_on_invoker_memory_ms : self . blocked_on_invoker_memory_micros / 1000 ,
116126 global_invoker_throttling_ms : self . global_throttling_micros / 1000 ,
127+ blocked_on_user_limit_ms : self . blocked_on_user_limit_micros / 1000 ,
128+ blocked_on_lock_ms : self . blocked_on_lock_micros / 1000 ,
117129 } ;
118130 self . reset ( ) ;
119131
@@ -139,11 +151,29 @@ impl Stats {
139151 self . blocked_on_invoker_memory_micros
140152 } ;
141153
154+ let blocked_on_user_limit_micros = if let Some ( last) = self . last_blocked_on_user_limit {
155+ let delay = last. elapsed ( ) ;
156+ self . blocked_on_user_limit_micros
157+ . saturating_add ( delay. as_micros ( ) . try_into ( ) . unwrap_or ( u32:: MAX ) )
158+ } else {
159+ self . blocked_on_user_limit_micros
160+ } ;
161+
162+ let blocked_on_lock_micros = if let Some ( last) = self . last_blocked_on_lock {
163+ let delay = last. elapsed ( ) ;
164+ self . blocked_on_lock_micros
165+ . saturating_add ( delay. as_micros ( ) . try_into ( ) . unwrap_or ( u32:: MAX ) )
166+ } else {
167+ self . blocked_on_lock_micros
168+ } ;
169+
142170 WaitStats {
143171 blocked_on_global_capacity_ms : blocked_on_global_capacity_micros / 1000 ,
144172 vqueue_start_throttling_ms : self . local_start_throttling_micros / 1000 ,
145173 global_invoker_throttling_ms : self . global_throttling_micros / 1000 ,
146174 blocked_on_invoker_memory_ms : blocked_on_invoker_memory_micros / 1000 ,
175+ blocked_on_user_limit_ms : blocked_on_user_limit_micros / 1000 ,
176+ blocked_on_lock_ms : blocked_on_lock_micros / 1000 ,
147177 }
148178 }
149179}
@@ -198,6 +228,38 @@ impl Stats {
198228 . saturating_add ( delay. as_micros ( ) . try_into ( ) . unwrap_or ( u32:: MAX ) )
199229 }
200230 }
231+
232+ fn record_user_limit_delay ( & mut self , is_now_blocked : bool ) {
233+ let last = self . last_blocked_on_user_limit . take ( ) ;
234+ self . last_blocked_on_user_limit = if is_now_blocked {
235+ Some ( Instant :: now ( ) )
236+ } else {
237+ None
238+ } ;
239+ if let Some ( last) = last {
240+ let delay = last. elapsed ( ) ;
241+ counter ! ( VQUEUE_USER_LIMIT_WAIT_MS ) . increment ( delay. as_millis ( ) as u64 ) ;
242+ self . blocked_on_user_limit_micros = self
243+ . blocked_on_user_limit_micros
244+ . saturating_add ( delay. as_micros ( ) . try_into ( ) . unwrap_or ( u32:: MAX ) )
245+ }
246+ }
247+
248+ fn record_lock_delay ( & mut self , is_now_blocked : bool ) {
249+ let last = self . last_blocked_on_lock . take ( ) ;
250+ self . last_blocked_on_lock = if is_now_blocked {
251+ Some ( Instant :: now ( ) )
252+ } else {
253+ None
254+ } ;
255+ if let Some ( last) = last {
256+ let delay = last. elapsed ( ) ;
257+ counter ! ( VQUEUE_LOCK_WAIT_MS ) . increment ( delay. as_millis ( ) as u64 ) ;
258+ self . blocked_on_lock_micros = self
259+ . blocked_on_lock_micros
260+ . saturating_add ( delay. as_micros ( ) . try_into ( ) . unwrap_or ( u32:: MAX ) )
261+ }
262+ }
201263}
202264
203265#[ derive( derive_more:: Debug ) ]
@@ -318,6 +380,12 @@ impl<S: VQueueStore> VQueueState<S> {
318380 ResourceKind :: InvokerThrottling => {
319381 // self.head_stats.record_start_throttling_delay(delay);
320382 }
383+ ResourceKind :: LimitKeyConcurrency { .. } => {
384+ self . head_stats . record_user_limit_delay ( true ) ;
385+ }
386+ ResourceKind :: Lock { .. } => {
387+ self . head_stats . record_lock_delay ( true ) ;
388+ }
321389 _ => { }
322390 }
323391 Ok ( Pop :: Blocked ( resource) )
0 commit comments