From 44b84d4d1325e4d8ba5f228eb7ae483faab4fd81 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 26 Nov 2025 22:37:09 +0100 Subject: [PATCH] Add vqueue items table The vqueue items table is intended to store the payload of a vqueue item (e.g. the invocation or the state mutation). The item is stored when it is being enqueued into the vqueue and removed once the vqueue item ends. The payload can be accessed by the tuple (qid, created_at, kind, entry_id). The creation timestamp is part of the key in order to establish the insertion order into the given vqueue. --- crates/partition-store/src/keys.rs | 4 + .../partition-store/src/vqueue_table/items.rs | 47 ++++++++++ .../partition-store/src/vqueue_table/mod.rs | 91 +++++++++++++++++++ crates/storage-api/src/vqueue_table/tables.rs | 32 +++++++ 4 files changed, 174 insertions(+) create mode 100644 crates/partition-store/src/vqueue_table/items.rs diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index b8dc587fdd..1bcb082044 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -57,6 +57,8 @@ pub enum KeyKind { VQueueMeta, // Resources' canonical key(s) VQueueEntryState, + // Items stored in vqueues (e.g. state mutations, invocations, etc.) + VQueueItems, } impl KeyKind { @@ -108,6 +110,7 @@ impl KeyKind { KeyKind::VQueueMeta => b"qm", // Queue Entry State (canonical state of vqueue entries) KeyKind::VQueueEntryState => b"qe", + KeyKind::VQueueItems => b"qI", } } @@ -142,6 +145,7 @@ impl KeyKind { b"qi" => Some(KeyKind::VQueueInbox), b"qm" => Some(KeyKind::VQueueMeta), b"qe" => Some(KeyKind::VQueueEntryState), + b"qI" => Some(KeyKind::VQueueItems), _ => None, } } diff --git a/crates/partition-store/src/vqueue_table/items.rs b/crates/partition-store/src/vqueue_table/items.rs new file mode 100644 index 0000000000..b7089e99dd --- /dev/null +++ b/crates/partition-store/src/vqueue_table/items.rs @@ -0,0 +1,47 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_storage_api::vqueue_table::{EntryId, EntryKind}; +use restate_types::clock::UniqueTimestamp; +use restate_types::identifiers::PartitionKey; +use restate_types::vqueue::{VQueueInstance, VQueueParent}; + +use crate::TableKind::VQueue; +use crate::keys::{KeyKind, define_table_key}; + +// Vqueue items are stored under the qid they belong to and their creation timestamp to maintain +// the order in which they were inserted into the vqueue. +// 'qI' | PKEY | VQUEUE_PARENT | VQUEUE_INSTANCE | CREATED_AT | ENTRY_KIND | ENTRY_ID +define_table_key!( + VQueue, + KeyKind::VQueueItems, + ItemsKey ( + partition_key: PartitionKey, + parent: VQueueParent, + instance: VQueueInstance, + created_at: UniqueTimestamp, + kind: EntryKind, + id: EntryId, + ) +); + +static_assertions::const_assert_eq!(ItemsKey::serialized_length_fixed(), 43); + +impl ItemsKey { + pub const fn serialized_length_fixed() -> usize { + KeyKind::SERIALIZED_LENGTH + + size_of::() + + size_of::() + + size_of::() + + size_of::() + + size_of::() + + size_of::() + } +} diff --git a/crates/partition-store/src/vqueue_table/mod.rs b/crates/partition-store/src/vqueue_table/mod.rs index cde01f753f..7e4d7cb58f 100644 --- a/crates/partition-store/src/vqueue_table/mod.rs +++ b/crates/partition-store/src/vqueue_table/mod.rs @@ -10,6 +10,7 @@ mod entry; mod inbox; +mod items; mod metadata; mod reader; mod running_reader; @@ -29,12 +30,14 @@ use restate_storage_api::vqueue_table::{ AsEntryState, AsEntryStateHeader, EntryCard, EntryId, EntryKind, EntryStateKind, ReadVQueueTable, ScanVQueueTable, Stage, VisibleAt, WriteVQueueTable, }; +use restate_types::clock::UniqueTimestamp; use restate_types::identifiers::PartitionKey; use restate_types::vqueue::{EffectivePriority, VQueueId, VQueueInstance, VQueueParent}; use self::entry::{EntryStateHeader, EntryStateKey, OwnedEntryState, OwnedHeader}; use self::inbox::{ActiveKey, InboxKey}; use crate::keys::{KeyCodec, KeyKind, TableKey}; +use crate::vqueue_table::items::ItemsKey; use crate::{PartitionDb, PartitionStoreTransaction, Result, StorageAccess}; impl KeyCodec for VQueueParent { @@ -356,6 +359,63 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> { self.raw_put_cf(KeyKind::VQueueEntryState, key_buffer, value_buf); } + fn put_item( + &mut self, + qid: &VQueueId, + created_at: UniqueTimestamp, + kind: EntryKind, + id: &EntryId, + item: E, + ) where + E: Message, + { + let key_buffer = self.cleared_key_buffer_mut(ItemsKey::serialized_length_fixed()); + + ItemsKey { + partition_key: qid.partition_key, + parent: qid.parent, + instance: qid.instance, + created_at, + kind, + id: *id, + } + .serialize_to(key_buffer); + + let key = key_buffer.split(); + + let value_buffer = self.cleared_value_buffer_mut(item.encoded_len()); + + item.encode(value_buffer) + .expect("enough space to encode item"); + let value = value_buffer.split(); + + self.raw_put_cf(KeyKind::VQueueItems, key, value); + } + + fn delete_item( + &mut self, + qid: &VQueueId, + created_at: UniqueTimestamp, + kind: EntryKind, + id: &EntryId, + ) { + let key_buffer = self.cleared_key_buffer_mut(ItemsKey::serialized_length_fixed()); + + ItemsKey { + partition_key: qid.partition_key, + parent: qid.parent, + instance: qid.instance, + created_at, + kind, + id: *id, + } + .serialize_to(key_buffer); + + let key = key_buffer.split(); + + self.raw_delete_cf(KeyKind::VQueueItems, key); + } + // fn update_vqueue_entry_state( // &mut self, // at: UniqueTimestamp, @@ -479,6 +539,37 @@ impl ReadVQueueTable for PartitionStoreTransaction<'_> { Ok(Some(OwnedEntryState { header, state })) } + async fn get_item( + &mut self, + qid: &VQueueId, + created_at: UniqueTimestamp, + kind: EntryKind, + id: &EntryId, + ) -> Result> + where + E: OwnedMessage, + { + let key_buffer = self.cleared_value_buffer_mut(ItemsKey::serialized_length_fixed()); + + ItemsKey { + partition_key: qid.partition_key, + parent: qid.parent, + instance: qid.instance, + created_at, + kind, + id: *id, + } + .serialize_to(key_buffer); + + let key = key_buffer.split(); + + let Some(raw_value) = self.get(ItemsKey::TABLE, key)? else { + return Ok(None); + }; + + Ok(Some(E::decode(&mut raw_value.as_ref())?)) + } + // async fn with_entry_state<'a, E, F, O>( // &mut self, // partition_key: PartitionKey, diff --git a/crates/storage-api/src/vqueue_table/tables.rs b/crates/storage-api/src/vqueue_table/tables.rs index 831421a9ad..18725026a3 100644 --- a/crates/storage-api/src/vqueue_table/tables.rs +++ b/crates/storage-api/src/vqueue_table/tables.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use restate_types::clock::UniqueTimestamp; use restate_types::identifiers::PartitionKey; use restate_types::vqueue::VQueueId; @@ -70,6 +71,26 @@ pub trait WriteVQueueTable { ) where E: EntryStateKind + bilrost::Message + bilrost::encoding::RawMessage, (): bilrost::encoding::EmptyState<(), E>; + + /// Stores a vqueue item for later use. + fn put_item( + &mut self, + qid: &VQueueId, + created_at: UniqueTimestamp, + kind: EntryKind, + id: &EntryId, + item: E, + ) where + E: bilrost::Message; + + /// Deletes a vqueue item. + fn delete_item( + &mut self, + qid: &VQueueId, + created_at: UniqueTimestamp, + kind: EntryKind, + id: &EntryId, + ); } pub trait ReadVQueueTable { @@ -102,6 +123,17 @@ pub trait ReadVQueueTable { + 'static, (): bilrost::encoding::EmptyState<(), E>; + /// Gets a vqueue item identified by its qid, the entry id, its kind and the creation timestamp. + fn get_item( + &mut self, + qid: &VQueueId, + created_at: UniqueTimestamp, + kind: EntryKind, + id: &EntryId, + ) -> impl Future>> + where + E: bilrost::OwnedMessage; + // Commented for future reference // fn with_entry_state<'a, E, F, O>( // &mut self,