Skip to content

Commit e62afce

Browse files
AhmedSolimanclaude
andcommitted
Introduce restate-util-sharding crate
Part of the effort to decompose the restate-types monolith into focused, composable utility crates. Add a new `restate-util-sharding` crate (in `util/sharding/`) following the `util/string` pattern. It defines the foundational sharding types: - `KeyRange`: a compact `Copy` newtype over `std::range::RangeInclusive<u64>`. 16 bytes (vs 24 for std::ops::RangeInclusive), wire-format compatible serde and bilrost encoding. Provides iter(), is_overlapping(), and split() helpers. - `PartitionKey`: type alias for u64 - `WithPartitionKey`: trait for types that carry a partition key The crate is re-exported from `restate-types::sharding` for ergonomic access. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 66162ac commit e62afce

11 files changed

Lines changed: 532 additions & 19 deletions

File tree

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ restate-service-protocol = { path = "crates/service-protocol" }
8383
restate-service-protocol-v4 = { path = "crates/service-protocol-v4" }
8484
restate-storage-api = { path = "crates/storage-api" }
8585
restate-storage-query-datafusion = { path = "crates/storage-query-datafusion" }
86+
restate-util-sharding = { path = "util/sharding" }
8687
restate-util-string = { path = "util/string" }
8788
restate-test-util = { path = "crates/test-util" }
8889
restate-time-util = { path = "crates/time-util" }

crates/encoding/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,5 @@ bytestring = { workspace = true }
1818

1919
[dev-dependencies]
2020
rand = { workspace = true }
21+
restate-util-sharding = { workspace = true, features = ["bilrost"] }
2122
static_assertions = { workspace = true }

crates/types/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ restate-memory = { workspace = true }
3030
restate-serde-util = { workspace = true }
3131
restate-test-util = { workspace = true, optional = true }
3232
restate-time-util = { workspace = true, features = ["serde", "serde_with"] }
33+
restate-util-sharding = { workspace = true, features = ["serde", "bilrost"] }
3334
restate-util-string = { workspace = true, features = ["serde", "bilrost"] }
3435
restate-utoipa = { workspace = true }
3536

crates/types/src/identifiers.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
mod partitioned;
1414

1515
pub use partitioned::PartitionedResourceId;
16+
pub use restate_util_sharding::{PartitionKey, WithPartitionKey};
1617

1718
use std::cell::RefCell;
1819
use std::fmt::{self, Display, Formatter};
@@ -155,10 +156,6 @@ pub type PartitionLeaderEpoch = (PartitionId, LeaderEpoch);
155156
// Just an alias
156157
pub type EntryIndex = u32;
157158

158-
/// Identifying to which partition a key belongs. This is unlike the [`PartitionId`]
159-
/// which identifies a consecutive range of partition keys.
160-
pub type PartitionKey = u64;
161-
162159
/// Returns the partition key computed from either the service_key, or idempotency_key, if possible
163160
fn deterministic_partition_key(
164161
service_key: Option<&str>,
@@ -169,12 +166,6 @@ fn deterministic_partition_key(
169166
.or_else(|| idempotency_key.map(partitioner::HashPartitioner::compute_partition_key))
170167
}
171168

172-
/// Trait for data structures that have a partition key
173-
pub trait WithPartitionKey {
174-
/// Returns the partition key
175-
fn partition_key(&self) -> PartitionKey;
176-
}
177-
178169
/// A family of resource identifiers that tracks the timestamp of its creation.
179170
pub trait TimestampAwareId {
180171
/// The timestamp when this ID was created.
@@ -607,12 +598,6 @@ impl WithPartitionKey for InvocationId {
607598
}
608599
}
609600

610-
impl<T: WithInvocationId> WithPartitionKey for T {
611-
fn partition_key(&self) -> PartitionKey {
612-
self.invocation_id().partition_key
613-
}
614-
}
615-
616601
impl Display for InvocationId {
617602
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
618603
// encode the id such that it is possible to do a string prefix search for a
@@ -767,6 +752,12 @@ impl From<(InvocationId, EntryIndex)> for JournalEntryId {
767752
}
768753
}
769754

755+
impl WithPartitionKey for JournalEntryId {
756+
fn partition_key(&self) -> PartitionKey {
757+
self.invocation_id.partition_key()
758+
}
759+
}
760+
770761
impl WithInvocationId for JournalEntryId {
771762
fn invocation_id(&self) -> InvocationId {
772763
self.invocation_id

crates/types/src/invocation/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,12 @@ impl InvocationRequest {
417417
}
418418
}
419419

420+
impl WithPartitionKey for InvocationRequest {
421+
fn partition_key(&self) -> PartitionKey {
422+
self.header.invocation_id().partition_key()
423+
}
424+
}
425+
420426
impl WithInvocationId for InvocationRequest {
421427
fn invocation_id(&self) -> InvocationId {
422428
self.header.invocation_id()
@@ -580,6 +586,12 @@ impl JournalCompletionTarget {
580586
}
581587
}
582588

589+
impl WithPartitionKey for JournalCompletionTarget {
590+
fn partition_key(&self) -> PartitionKey {
591+
self.caller_id.partition_key()
592+
}
593+
}
594+
583595
impl WithInvocationId for JournalCompletionTarget {
584596
fn invocation_id(&self) -> InvocationId {
585597
self.caller_id
@@ -597,6 +609,12 @@ pub struct InvocationResponse {
597609
pub result: ResponseResult,
598610
}
599611

612+
impl WithPartitionKey for InvocationResponse {
613+
fn partition_key(&self) -> PartitionKey {
614+
self.target.invocation_id().partition_key()
615+
}
616+
}
617+
600618
impl WithInvocationId for InvocationResponse {
601619
fn invocation_id(&self) -> InvocationId {
602620
self.target.invocation_id()
@@ -1351,6 +1369,12 @@ pub struct NotifySignalRequest {
13511369
pub signal: Signal,
13521370
}
13531371

1372+
impl WithPartitionKey for NotifySignalRequest {
1373+
fn partition_key(&self) -> PartitionKey {
1374+
self.invocation_id.partition_key()
1375+
}
1376+
}
1377+
13541378
impl WithInvocationId for NotifySignalRequest {
13551379
fn invocation_id(&self) -> InvocationId {
13561380
self.invocation_id

crates/types/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ pub mod memory {
8181
pub use restate_memory::*;
8282
}
8383

84+
// Re-export restate-util-sharding crate for key range utilities.
85+
pub mod sharding {
86+
pub use restate_util_sharding::*;
87+
}
88+
8489
// Re-export metrics' SharedString (Space-efficient Cow + RefCounted variant)
8590
pub type SharedString = metrics::SharedString;
8691

crates/wal-protocol/src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,15 +242,17 @@ impl HasRecordKeys for Envelope {
242242
Command::TruncateOutbox(_) => Keys::Single(self.partition_key()),
243243
Command::ProxyThrough(_) => Keys::Single(self.partition_key()),
244244
Command::AttachInvocation(_) => Keys::Single(self.partition_key()),
245-
Command::ResumeInvocation(req) => Keys::Single(req.partition_key()),
246-
Command::RestartAsNewInvocation(req) => Keys::Single(req.partition_key()),
245+
Command::ResumeInvocation(req) => Keys::Single(req.invocation_id.partition_key()),
246+
Command::RestartAsNewInvocation(req) => Keys::Single(req.invocation_id.partition_key()),
247247
// todo: Handle journal entries that request cross-partition invocations
248248
Command::InvokerEffect(effect) => Keys::Single(effect.invocation_id.partition_key()),
249249
Command::Timer(timer) => Keys::Single(timer.invocation_id().partition_key()),
250250
Command::ScheduleTimer(timer) => Keys::Single(timer.invocation_id().partition_key()),
251251
Command::InvocationResponse(response) => Keys::Single(response.partition_key()),
252252
Command::NotifySignal(sig) => Keys::Single(sig.partition_key()),
253-
Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()),
253+
Command::NotifyGetInvocationOutputResponse(res) => {
254+
Keys::Single(res.target.partition_key())
255+
}
254256
Command::UpsertSchema(schema) => schema.partition_key_range.clone(),
255257
Command::VQWaitingToRunning(_) => Keys::Single(self.partition_key()),
256258
Command::VQYieldRunning(_) => Keys::Single(self.partition_key()),

util/sharding/Cargo.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "restate-util-sharding"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
publish = false
9+
10+
[features]
11+
default = []
12+
serde = ["dep:serde"]
13+
bilrost = ["dep:bilrost"]
14+
15+
[dependencies]
16+
restate-workspace-hack = { workspace = true }
17+
18+
bilrost = { workspace = true, optional = true }
19+
serde = { workspace = true, optional = true }
20+
21+
[dev-dependencies]
22+
restate-util-sharding = { path = ".", default-features = false, features = ["serde", "bilrost"] }

0 commit comments

Comments
 (0)