Skip to content

Commit 8095323

Browse files
committed
[9/N][VQueues] Ensure the scheduler's internal clock wheel is synchronized with eligibility_at
This changeset makes sure that we have a reliable datum point that's shared between the delayed queue internal timer wheel and the scheduler. Additionally, it ensures that the scheduler can create `Instant` values that are synchronized with the datum point acquired from UniqueTimestamp. This fixes the issue introduced in changeset 6e65909 where we would become eligible slightly before the actual eligibility time, causing supurious rescheduling.
1 parent b8a5018 commit 8095323

File tree

7 files changed

+123
-77
lines changed

7 files changed

+123
-77
lines changed

crates/types/src/clock/unique_timestamp.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,22 @@ impl UniqueTimestamp {
9696
Ok(Self(nz))
9797
}
9898

99+
/// Compare the physical clock of both timestamps and ignore the logical clock.
100+
///
101+
/// Note that this has millisecond precision.
102+
pub fn cmp_physical(&self, other: &Self) -> std::cmp::Ordering {
103+
self.physical_raw().cmp(&other.physical_raw())
104+
}
105+
99106
pub fn as_u64(&self) -> u64 {
100107
self.0.get()
101108
}
102109

110+
/// Fails if the resulting timestamp is out of range
111+
pub fn add_millis(&self, millis: u64) -> Result<Self, Error> {
112+
Self::from_parts(self.physical_raw() + millis, self.logical_raw())
113+
}
114+
103115
#[inline(always)]
104116
fn physical_raw(&self) -> u64 {
105117
// extract the physical time

crates/vqueues/src/delay_queue.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -471,10 +471,10 @@ impl<T> DelayQueue<T> {
471471
///
472472
/// ```ignore
473473
/// # use restate_vqueues::delay_queue::DelayQueue;
474-
/// let delay_queue: DelayQueue<u32> = DelayQueue::new();
474+
/// let delay_queue: DelayQueue<u32> = DelayQueue::new(tokio::time::Instant::now());
475475
/// ```
476-
pub fn new() -> DelayQueue<T> {
477-
DelayQueue::with_capacity(0)
476+
pub fn new(start: Instant) -> DelayQueue<T> {
477+
DelayQueue::with_capacity(start, 0)
478478
}
479479

480480
/// Creates a new, empty, `DelayQueue` with the specified capacity.
@@ -491,7 +491,7 @@ impl<T> DelayQueue<T> {
491491
///
492492
/// # #[tokio::main(flavor = "current_thread")]
493493
/// # async fn main() {
494-
/// let mut delay_queue = DelayQueue::with_capacity(10);
494+
/// let mut delay_queue = DelayQueue::with_capacity(tokio::time::Instant::now(), 10);
495495
///
496496
/// // These insertions are done without further allocation
497497
/// for i in 0..10 {
@@ -502,14 +502,14 @@ impl<T> DelayQueue<T> {
502502
/// delay_queue.insert(11, Duration::from_secs(11));
503503
/// # }
504504
/// ```
505-
pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
505+
pub fn with_capacity(start: Instant, capacity: usize) -> DelayQueue<T> {
506506
DelayQueue {
507507
wheel: Wheel::new(),
508508
slab: SlabStorage::with_capacity(capacity),
509509
expired: Stack::default(),
510510
delay: None,
511511
wheel_now: 0,
512-
start: Instant::now(),
512+
start,
513513
waker: None,
514514
}
515515
}
@@ -1215,12 +1215,6 @@ impl<T> DelayQueue<T> {
12151215
// We never put `T` in a `Pin`...
12161216
impl<T> Unpin for DelayQueue<T> {}
12171217

1218-
impl<T> Default for DelayQueue<T> {
1219-
fn default() -> DelayQueue<T> {
1220-
DelayQueue::new()
1221-
}
1222-
}
1223-
12241218
impl<T> Stream for DelayQueue<T> {
12251219
// DelayQueue seems much more specific, where a user may care that it
12261220
// has reached capacity, so return those errors instead of panicking.

crates/vqueues/src/scheduler.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ use smallvec::SmallVec;
2020
use restate_futures_util::concurrency::{Concurrency, Permit};
2121
use restate_storage_api::StorageError;
2222
use restate_storage_api::vqueue_table::{ScanVQueueTable, VQueueStore};
23-
use restate_types::clock::UniqueTimestamp;
24-
use restate_types::time::MillisSinceEpoch;
2523
use restate_types::vqueue::VQueueId;
2624

2725
use crate::VQueueEvent;
2826
use crate::{VQueuesMeta, VQueuesMetaMut};
2927

3028
use self::drr::DRRScheduler;
3129

30+
mod clock;
3231
mod drr;
3332
mod vqueue_state;
3433

@@ -185,12 +184,11 @@ where
185184

186185
pub fn on_inbox_event(
187186
&mut self,
188-
now: UniqueTimestamp,
189187
vqueues: VQueuesMeta<'_>,
190188
event: &VQueueEvent<S::Item>,
191189
) -> Result<(), StorageError> {
192190
if let State::Active(ref mut drr_scheduler) = self.state {
193-
drr_scheduler.as_mut().on_inbox_event(now, vqueues, event)?;
191+
drr_scheduler.as_mut().on_inbox_event(vqueues, event)?;
194192
}
195193
Ok(())
196194
}
@@ -199,21 +197,19 @@ where
199197
&mut self,
200198
vqueues: VQueuesMeta<'_>,
201199
) -> Result<Decision<S::Item>, StorageError> {
202-
let now = UniqueTimestamp::from_unix_millis(MillisSinceEpoch::now()).unwrap();
203-
poll_fn(|cx| self.poll_schedule_next(now, cx, vqueues)).await
200+
poll_fn(|cx| self.poll_schedule_next(cx, vqueues)).await
204201
}
205202

206203
pub fn poll_schedule_next(
207204
&mut self,
208-
now: UniqueTimestamp,
209205
cx: &mut std::task::Context<'_>,
210206
vqueues: VQueuesMeta<'_>,
211207
) -> Poll<Result<Decision<S::Item>, StorageError>> {
212208
match self.state {
213209
// if scheduler is disabled, we always return pending.
214210
State::Disabled => Poll::Pending,
215211
State::Active(ref mut drr_scheduler) => {
216-
drr_scheduler.as_mut().poll_schedule_next(now, cx, vqueues)
212+
drr_scheduler.as_mut().poll_schedule_next(cx, vqueues)
217213
}
218214
}
219215
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::time::Duration;
12+
13+
use restate_types::clock::UniqueTimestamp;
14+
15+
/// A clock that tracks the physical clock of the scheduler that is synchronized
16+
/// with the UniqueTimestamp physical clock.
17+
pub struct SchedulerClock {
18+
tokio_origin: tokio::time::Instant,
19+
ts_origin: UniqueTimestamp,
20+
}
21+
22+
impl SchedulerClock {
23+
/// `now` is the current physical clock that corresponds to `Instant::now()` at the time
24+
/// of construction.
25+
pub fn new(now: UniqueTimestamp) -> Self {
26+
Self {
27+
tokio_origin: tokio::time::Instant::now(),
28+
ts_origin: now,
29+
}
30+
}
31+
32+
pub fn origin_instant(&self) -> tokio::time::Instant {
33+
self.tokio_origin
34+
}
35+
36+
pub fn now_ts(&self) -> UniqueTimestamp {
37+
let delta = self.tokio_origin.elapsed().as_millis() as u64;
38+
self.ts_origin
39+
.add_millis(delta)
40+
.expect("clock doesn't overflow")
41+
}
42+
43+
/// Calculates a future tokio Instant from the physical clock of `ts`.
44+
///
45+
/// Returns the clock datum/origin point if the input `ts` is in the past.
46+
pub fn ts_to_future_instant(&self, ts: UniqueTimestamp) -> tokio::time::Instant {
47+
let delta = ts.milliseconds_since(self.ts_origin);
48+
self.tokio_origin + Duration::from_millis(delta)
49+
}
50+
}

crates/vqueues/src/scheduler/drr.rs

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,20 @@ use std::num::NonZeroU16;
1313
use std::pin::Pin;
1414
use std::task::Poll;
1515
use std::task::Waker;
16+
use std::time::Duration;
1617

1718
use hashbrown::HashMap;
1819
use hashbrown::hash_map;
1920
use pin_project::pin_project;
21+
use tokio::time::Instant;
2022
use tracing::{info, trace};
2123

2224
use restate_futures_util::concurrency::Concurrency;
2325
use restate_futures_util::concurrency::Permit;
2426
use restate_storage_api::StorageError;
2527
use restate_storage_api::vqueue_table::VQueueStore;
2628
use restate_types::clock::UniqueTimestamp;
29+
use restate_types::time::MillisSinceEpoch;
2730
use restate_types::vqueue::VQueueId;
2831

2932
use crate::EventDetails;
@@ -34,6 +37,7 @@ use crate::scheduler::Assignments;
3437
use crate::scheduler::vqueue_state::Eligibility;
3538

3639
use super::Decision;
40+
use super::clock::SchedulerClock;
3741
use super::vqueue_state::VQueueState;
3842

3943
/// Capacity to maintain for N vqueues (N=100)
@@ -55,8 +59,9 @@ pub struct DRRScheduler<S: VQueueStore, Token> {
5559
remaining_in_round: usize,
5660
/// Waker to be notified when scheduler is potentially able to scheduler more work
5761
waker: Waker,
62+
datum: SchedulerClock,
5863
/// Time of the last memory reporting and memory compaction
59-
last_report: Option<UniqueTimestamp>,
64+
last_report: Instant,
6065
// SAFETY NOTE: **must** Keep this at the end since it needs to outlive all readers.
6166
storage: S,
6267
}
@@ -94,17 +99,27 @@ where
9499
q.len(),
95100
);
96101

102+
let datum = SchedulerClock::new(
103+
UniqueTimestamp::from_unix_millis(MillisSinceEpoch::now())
104+
.expect("clock does not overflow"),
105+
);
106+
// Makes sure we use the same clock datum for the internal timer wheel and for our
107+
// own eligibility checks.
108+
let start = datum.origin_instant();
109+
let delayed_eligibility = DelayQueue::new(start);
110+
97111
Self {
98112
limit_qid_per_poll,
99113
concurrency_limiter,
100114
q,
101115
eligible,
102116
global_sched_round: 0,
103117
remaining_in_round: 0,
104-
delayed_eligibility: DelayQueue::new(),
118+
delayed_eligibility,
105119
unconfirmed_capacity_permits: Permit::new_empty(),
106120
waker: Waker::noop().clone(),
107-
last_report: None,
121+
datum,
122+
last_report: start,
108123
storage,
109124
}
110125
}
@@ -122,7 +137,6 @@ where
122137

123138
pub fn poll_schedule_next(
124139
mut self: Pin<&mut Self>,
125-
now: UniqueTimestamp,
126140
cx: &mut std::task::Context<'_>,
127141
vqueues: VQueuesMeta<'_>,
128142
) -> Poll<Result<Decision<S::Item>, StorageError>> {
@@ -131,14 +145,11 @@ where
131145
Abort,
132146
}
133147

134-
if self
135-
.last_report
136-
.is_none_or(|t| now.milliseconds_since(t) >= 10000)
137-
{
148+
if self.last_report.elapsed() >= Duration::from_secs(10) {
138149
vqueues.report();
139150
self.report();
140151
// also report vqueues states
141-
self.last_report = Some(now);
152+
self.last_report = Instant::now();
142153
// compact memory
143154
self.q.shrink_to(MIN_VQUEUES_CAPACITY);
144155
self.delayed_eligibility.compact();
@@ -151,20 +162,6 @@ where
151162
if self.remaining_in_round == 0 {
152163
// Pop all eligible vqueues that were delayed since we are starting a new round
153164
// Once we hit pending, the waker will be registered.
154-
//
155-
// There is currently an issue due to using two different clock sources. The timer
156-
// wheel uses tokio's internal clock but our scheduler's design relies on explicit
157-
// monotonic timestamps. This will cause the timer to expire slightly before or after
158-
// the actual point at which the input (now) would satisfy the head item's
159-
// eligibility requirements. As a result, we will see one of three scenarios:
160-
// 1. Time aligns. We pop the timer and the head element will be immediately eligible.
161-
// 2. We are woken up before `now` satifies the requirement, in this case will
162-
// schedule a new timer to catch up on the difference.
163-
// 3. We are worken up after `now` by a few hundres of millis, in this case the
164-
// head will be eligible but we will be a little late.
165-
//
166-
// This will be fixed in a later change after we reason about whether we need any
167-
// causal entanglement between the RSM's clock and the scheduler's internal clock.
168165
let previous_round = self.global_sched_round;
169166
while let Poll::Ready(Some(expired)) = self.delayed_eligibility.poll_expired(cx) {
170167
let Some(qstate) = self.q.get_mut(expired.get_ref()) else {
@@ -188,6 +185,7 @@ where
188185
}
189186
}
190187

188+
let now = self.datum.now_ts();
191189
while self.remaining_in_round > 0 {
192190
// bail if we exhausted coop budget.
193191
let coop = match tokio::task::coop::poll_proceed(cx) {
@@ -227,7 +225,10 @@ where
227225
}
228226
Eligibility::EligibleAt(wake_up_at) => {
229227
*this.remaining_in_round -= 1;
230-
qstate.maybe_schedule_wakeup(wake_up_at, this.delayed_eligibility, now);
228+
qstate.maybe_schedule_wakeup(
229+
this.datum.ts_to_future_instant(wake_up_at),
230+
this.delayed_eligibility,
231+
);
231232
this.eligible.pop_front();
232233
break 'single_vqueue Outcome::ContinueRound;
233234
}
@@ -306,7 +307,6 @@ where
306307
#[tracing::instrument(skip_all)]
307308
pub fn on_inbox_event(
308309
&mut self,
309-
now: UniqueTimestamp,
310310
vqueues: VQueuesMeta<'_>,
311311
event: &VQueueEvent<S::Item>,
312312
) -> Result<(), StorageError> {
@@ -326,7 +326,8 @@ where
326326
return Ok(());
327327
}
328328

329-
match qstate.check_eligibility(now, meta, config) {
329+
let now_ts = self.datum.now_ts();
330+
match qstate.check_eligibility(now_ts, meta, config) {
330331
Eligibility::Eligible if !self.eligible.contains(&qid) => {
331332
// Make eligible immediately.
332333
qstate.deficit.set_last_round(self.global_sched_round);
@@ -335,9 +336,8 @@ where
335336
}
336337
Eligibility::EligibleAt(eligiblility_ts) if !self.eligible.contains(&qid) => {
337338
qstate.maybe_schedule_wakeup(
338-
eligiblility_ts,
339+
self.datum.ts_to_future_instant(eligiblility_ts),
339340
&mut self.delayed_eligibility,
340-
now,
341341
);
342342
}
343343
_ => { /* do nothing */ }
@@ -354,7 +354,10 @@ where
354354
// drop the already acquired permit
355355
let _ = self.unconfirmed_capacity_permits.split(1);
356356

357-
if qstate.check_eligibility(now, meta, config).is_eligible() {
357+
if qstate
358+
.check_eligibility(self.datum.now_ts(), meta, config)
359+
.is_eligible()
360+
{
358361
if !self.eligible.contains(&qid) {
359362
self.eligible.push_back(qid);
360363
qstate.deficit.set_last_round(self.global_sched_round);
@@ -377,7 +380,7 @@ where
377380
// drop the already acquired permit
378381
let _ = self.unconfirmed_capacity_permits.split(1);
379382
} else if qstate.notify_removed(item_hash) {
380-
match qstate.check_eligibility(now, meta, config) {
383+
match qstate.check_eligibility(self.datum.now_ts(), meta, config) {
381384
Eligibility::Eligible if !self.eligible.contains(&qid) => {
382385
// Make eligible immediately.
383386
qstate.deficit.set_last_round(self.global_sched_round);
@@ -388,9 +391,8 @@ where
388391
if !self.eligible.contains(&qid) =>
389392
{
390393
qstate.maybe_schedule_wakeup(
391-
eligiblility_ts,
394+
self.datum.ts_to_future_instant(eligiblility_ts),
392395
&mut self.delayed_eligibility,
393-
now,
394396
);
395397
}
396398
_ => { /* do nothing */ }

0 commit comments

Comments
 (0)