Skip to content

Commit 16a84ca

Browse files
committed
[VQueues] Major redesign of vqueues internal design
1 parent cc809fb commit 16a84ca

21 files changed

Lines changed: 404 additions & 397 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/partition-store/src/tests/vqueue_table_test/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ fn entry_value(id: u8, original_run_at: u64, num_attempts: u32) -> EntryValue {
6565
stats.latest_attempt_at = Some(created_at);
6666
}
6767
let status = if num_attempts > 0 {
68-
Status::Running
68+
Status::Started
6969
} else if stats.first_runnable_at > created_at.to_unix_millis() {
7070
Status::Scheduled
7171
} else {

crates/partition-store/src/vqueue_table/entry.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ define_table_key!(
3434
)
3535
);
3636

37-
static_assertions::const_assert_eq!(EntryKind::serialized_length_fixed(), 1);
38-
3937
impl EntryStatusKey {
4038
pub const fn serialized_length_fixed() -> usize {
4139
KeyKind::SERIALIZED_LENGTH
@@ -61,8 +59,6 @@ impl From<&InvocationId> for EntryStatusKey {
6159
pub struct StatusHeaderRawRef<'a> {
6260
#[bilrost(tag(1))]
6361
pub(super) qid: VQueueIdRef<'a>,
64-
/// Unknown is an invalid state, this will be set to None when the invocation
65-
/// leaves the queue.
6662
#[bilrost(tag(2))]
6763
pub(super) stage: Stage,
6864
#[bilrost(tag(3))]

crates/partition-store/src/vqueue_table/inbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use super::key_codec::HasLock;
2525
// 'qP' | QID | HAS_LOCK(1B) | RUN_AT(8B) | SEQ(8B) | ENTRY_ID(17B)
2626
// 'qF' | QID | HAS_LOCK(1B) | RUN_AT(8B) | SEQ(8B) | ENTRY_ID(17B)
2727

28-
// Inbox, Running, Suspended, Paused, and have the same key design.
28+
// Inbox, Running, Suspended, Paused, and Finished and have the same key design.
2929
macro_rules! define_stage_keys {
3030
( $(Stage::$stage: ident => $key_name: ident),+ $(,)? ) => {
3131
paste::paste! {

crates/partition-store/src/vqueue_table/items.rs renamed to crates/partition-store/src/vqueue_table/input.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use crate::keys::{KeyKind, define_table_key};
1515

1616
// Input payloads stored for vqueue items.
1717
// Vqueue items are stored under the qid they belong to and their creation order
18+
// The sequence number acts as an epoch to distinguish between different incarinations
19+
// of the same EntryId (e.g. deterministic invocation re-invoked after expiration).
20+
//
1821
// 'qi' | QID | SEQ | ENTRY_ID
1922
define_table_key!(
2023
VQueue,

crates/partition-store/src/vqueue_table/key_codec.rs

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11-
use anyhow::Context;
1211
use bytes::{Buf, BufMut};
1312

1413
use restate_clock::RoughTimestamp;
@@ -130,11 +129,14 @@ impl KeyDecode for EntryId {
130129
}
131130
}
132131

133-
// RoughTimestamp is encoded as a u64 in big-endian order to enable forward compatibility
134-
// with potential future higher-precision restate-epoch based timestamp.
132+
// The on-disk value is milliseconds-since-restate-epoch, quantized to whole
133+
// seconds by the current seconds-precision `RoughTimestamp`. Storing in the
134+
// millisecond domain keeps the format forward-compatible with a future
135+
// higher-precision ms timestamp using the same u64 slot — no migration, and
136+
// sort order is preserved across the boundary.
135137
impl KeyEncode for RoughTimestamp {
136138
fn encode<B: BufMut>(&self, target: &mut B) {
137-
target.put_u64(self.as_u32() as u64);
139+
target.put_u64(self.as_u32() as u64 * 1_000);
138140
}
139141

140142
fn serialized_length(&self) -> usize {
@@ -144,11 +146,12 @@ impl KeyEncode for RoughTimestamp {
144146

145147
impl KeyDecode for RoughTimestamp {
146148
fn decode<B: Buf>(source: &mut B) -> crate::Result<Self> {
147-
let raw = source.get_u64();
148-
Ok(Self::new(
149-
raw.try_into()
150-
.context("RoughTimestamp needs to fit into u32")?,
151-
))
149+
let raw_ms = source.get_u64();
150+
// Floor to seconds. Clamp so that forward-written ms values beyond
151+
// `RoughTimestamp`'s range saturate at MAX instead of truncating
152+
// on the `as u32` cast.
153+
let secs = (raw_ms / 1_000).min(u32::MAX as u64) as u32;
154+
Ok(Self::new(secs))
152155
}
153156
}
154157

@@ -190,3 +193,55 @@ impl KeyEncode for EntryKey {
190193
Self::serialized_length_fixed()
191194
}
192195
}
196+
197+
#[cfg(test)]
198+
mod tests {
199+
use super::*;
200+
201+
#[test]
202+
fn rough_timestamp_ms_codec() {
203+
// Round-trip preserves seconds across the representable range, and
204+
// encoded bytes are always an exact multiple of 1_000 (i.e. the
205+
// encoder only ever emits whole-second ms values, so the `*1_000` /
206+
// `/1_000` pair is lossless for every `RoughTimestamp`).
207+
for &secs in &[0u32, 1, 100, 1_000, u32::MAX / 2, u32::MAX - 1] {
208+
let mut buf = Vec::new();
209+
RoughTimestamp::new(secs).encode(&mut buf);
210+
211+
let raw = u64::from_be_bytes(buf.as_slice().try_into().unwrap());
212+
assert_eq!(
213+
raw % 1_000,
214+
0,
215+
"encoded value must be a multiple of 1_000 (got {raw} for {secs}s)"
216+
);
217+
assert_eq!(
218+
raw,
219+
secs as u64 * 1_000,
220+
"encoded ms must equal seconds * 1_000"
221+
);
222+
223+
let mut slice = buf.as_slice();
224+
let decoded = RoughTimestamp::decode(&mut slice).expect("decode ok");
225+
assert_eq!(decoded.as_u32(), secs, "round-trip failed for {secs}s");
226+
}
227+
228+
// Forward-compat: sub-second ms values floor to the correct second.
229+
let mut sub_second = Vec::new();
230+
sub_second.put_u64(5_999);
231+
let mut slice = sub_second.as_slice();
232+
assert_eq!(
233+
RoughTimestamp::decode(&mut slice).unwrap().as_u32(),
234+
5,
235+
"ms value should floor to whole seconds on decode"
236+
);
237+
238+
// Forward-compat: ms values beyond RoughTimestamp::MAX clamp to MAX.
239+
let mut over_max = Vec::new();
240+
over_max.put_u64(u64::MAX);
241+
let mut slice = over_max.as_slice();
242+
assert_eq!(
243+
RoughTimestamp::decode(&mut slice).unwrap(),
244+
RoughTimestamp::MAX,
245+
);
246+
}
247+
}

crates/partition-store/src/vqueue_table/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
mod entry;
1212
mod inbox;
13-
mod items;
13+
mod input;
1414
mod key_codec;
1515
mod metadata;
1616
mod reader;
@@ -21,7 +21,7 @@ use std::io::Cursor;
2121

2222
pub use entry::{EntryStatusKey, StatusHeaderRaw};
2323
pub use inbox::InboxKey;
24-
pub use items::InputPayloadKey;
24+
pub use input::InputPayloadKey;
2525
pub use metadata::*;
2626

2727
use anyhow::Context;
@@ -298,21 +298,21 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> {
298298
) where
299299
E: Message,
300300
{
301-
let key_buf = {
302-
let seq = seq.into();
303-
let key = InputPayloadKey::builder_ref().qid(qid).seq(&seq).id(id);
304-
let key_buf = self.cleared_key_buffer_mut(key.serialized_length());
305-
key.serialize_to(key_buf);
306-
key_buf.split()
307-
};
301+
let seq = seq.into();
302+
let mut key_buffer = [0u8; InputPayloadKey::serialized_length_fixed()];
303+
InputPayloadKey::builder_ref()
304+
.qid(qid)
305+
.seq(&seq)
306+
.id(id)
307+
.serialize_to(&mut key_buffer.as_mut());
308308

309309
let value_buffer = self.cleared_value_buffer_mut(item.encoded_len());
310310

311311
item.encode(value_buffer)
312312
.expect("enough space to encode item");
313313
let value = value_buffer.split();
314314

315-
self.raw_put_cf(KeyKind::VQueueInput, key_buf, value);
315+
self.raw_put_cf(KeyKind::VQueueInput, key_buffer, value);
316316
}
317317

318318
fn delete_vqueue_input_payload(&mut self, qid: &VQueueId, seq: impl Into<Seq>, id: &EntryId) {

crates/storage-api/src/vqueue_table/entry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl EntryKey {
157157

158158
#[derive(Debug, Clone, bilrost::Message)]
159159
pub struct EntryValue {
160-
/// Status is copied over from the entry's state when the last transition
160+
/// Status is copied over from the entry status table when the last transition
161161
/// happened.
162162
#[bilrost(tag(1))]
163163
pub status: Status,

crates/storage-api/src/vqueue_table/entry_status.rs

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -23,61 +23,32 @@ pub enum Status {
2323
New,
2424
#[bilrost(2)]
2525
Scheduled,
26-
// -- Statuses for an invocation that has already started (attempted at least once)
26+
/// Invocation has started running with at least one attempt.
2727
#[bilrost(3)]
28-
Running,
29-
#[bilrost(4)]
30-
Suspended,
28+
Started,
3129
/// Invocation has previously started but has been placed back on the waiting inbox
3230
/// due to an attempt error.
33-
#[bilrost(5)]
31+
#[bilrost(4)]
3432
BackingOff,
3533
/// Invocation has previously started but has been placed back on the waiting inbox.
3634
/// This does not mean that the invocation attempt has failed, it just means that
3735
/// it has been evicted from the run queue and will be resumed later.
38-
#[bilrost(6)]
36+
#[bilrost(5)]
3937
Yielded,
40-
/// Inovocation that was suspended and is now waiting for its turn
41-
/// to run.
42-
#[bilrost(7)]
43-
WakingUp,
4438
///
4539
/// -- Terminal states, invocation cannot transition back to any of the previous
4640
/// statuses
4741
///
48-
#[bilrost(8)]
42+
#[bilrost(6)]
4943
Killed,
50-
#[bilrost(9)]
44+
#[bilrost(7)]
5145
Cancelled,
52-
#[bilrost(10)]
46+
#[bilrost(8)]
5347
Failed,
54-
#[bilrost(11)]
48+
#[bilrost(9)]
5549
Succeeded,
5650
}
5751

58-
impl Status {
59-
#[inline]
60-
pub fn is_terminal(&self) -> bool {
61-
matches!(
62-
self,
63-
Self::Unknown | Self::Killed | Self::Cancelled | Self::Failed | Self::Succeeded
64-
)
65-
}
66-
67-
#[inline]
68-
pub fn can_move_to_run(&self) -> bool {
69-
matches!(
70-
self,
71-
Self::New
72-
| Self::Scheduled
73-
| Self::WakingUp
74-
| Self::Yielded
75-
| Self::BackingOff
76-
| Self::Running
77-
)
78-
}
79-
}
80-
8152
pub trait EntryStatusHeader: std::fmt::Debug {
8253
fn vqueue_id(&self) -> &VQueueId;
8354
fn status(&self) -> Status;
@@ -91,6 +62,18 @@ pub trait EntryStatusHeader: std::fmt::Debug {
9162
fn seq(&self) -> Seq;
9263
fn stats(&self) -> &EntryStatistics;
9364
fn display_entry_id(&self) -> impl std::fmt::Display + '_;
65+
/// Returns new if this entry has not started yet.
66+
fn has_started(&self) -> bool {
67+
self.stats().num_attempts > 0
68+
}
69+
/// Returns true if this entry is in the terminal state and cannot transition
70+
/// out of it.
71+
fn is_terminal(&self) -> bool {
72+
if matches!(self.stage(), Stage::Finished) {
73+
return true;
74+
}
75+
false
76+
}
9477
}
9578

9679
/// For future support for extra state storage for entries.

0 commit comments

Comments
 (0)