Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/partition-store/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub enum KeyKind {
VQueueMeta,
// Resources' canonical key(s)
VQueueEntryState,
// Items stored in vqueues (e.g. state mutations, invocations, etc.)
VQueueItems,
}

impl KeyKind {
Expand Down Expand Up @@ -108,6 +110,7 @@ impl KeyKind {
KeyKind::VQueueMeta => b"qm",
// Queue Entry State (canonical state of vqueue entries)
KeyKind::VQueueEntryState => b"qe",
KeyKind::VQueueItems => b"qI",
}
}

Expand Down Expand Up @@ -142,6 +145,7 @@ impl KeyKind {
b"qi" => Some(KeyKind::VQueueInbox),
b"qm" => Some(KeyKind::VQueueMeta),
b"qe" => Some(KeyKind::VQueueEntryState),
b"qI" => Some(KeyKind::VQueueItems),
_ => None,
}
}
Expand Down
47 changes: 47 additions & 0 deletions crates/partition-store/src/vqueue_table/items.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_storage_api::vqueue_table::{EntryId, EntryKind};
use restate_types::clock::UniqueTimestamp;
use restate_types::identifiers::PartitionKey;
use restate_types::vqueue::{VQueueInstance, VQueueParent};

use crate::TableKind::VQueue;
use crate::keys::{KeyKind, define_table_key};

// Vqueue items are stored under the qid they belong to and their creation timestamp to maintain
// the order in which they were inserted into the vqueue.
// 'qI' | PKEY | VQUEUE_PARENT | VQUEUE_INSTANCE | CREATED_AT | ENTRY_KIND | ENTRY_ID
define_table_key!(
VQueue,
KeyKind::VQueueItems,
ItemsKey (
partition_key: PartitionKey,
parent: VQueueParent,
instance: VQueueInstance,
created_at: UniqueTimestamp,
kind: EntryKind,
id: EntryId,
)
);

static_assertions::const_assert_eq!(ItemsKey::serialized_length_fixed(), 43);

impl ItemsKey {
pub const fn serialized_length_fixed() -> usize {
KeyKind::SERIALIZED_LENGTH
+ size_of::<PartitionKey>()
+ size_of::<VQueueParent>()
+ size_of::<VQueueInstance>()
+ size_of::<UniqueTimestamp>()
+ size_of::<EntryKind>()
+ size_of::<EntryId>()
}
}
91 changes: 91 additions & 0 deletions crates/partition-store/src/vqueue_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

mod entry;
mod inbox;
mod items;
mod metadata;
mod reader;
mod running_reader;
Expand All @@ -29,12 +30,14 @@ use restate_storage_api::vqueue_table::{
AsEntryState, AsEntryStateHeader, EntryCard, EntryId, EntryKind, EntryStateKind,
ReadVQueueTable, ScanVQueueTable, Stage, VisibleAt, WriteVQueueTable,
};
use restate_types::clock::UniqueTimestamp;
use restate_types::identifiers::PartitionKey;
use restate_types::vqueue::{EffectivePriority, VQueueId, VQueueInstance, VQueueParent};

use self::entry::{EntryStateHeader, EntryStateKey, OwnedEntryState, OwnedHeader};
use self::inbox::{ActiveKey, InboxKey};
use crate::keys::{KeyCodec, KeyKind, TableKey};
use crate::vqueue_table::items::ItemsKey;
use crate::{PartitionDb, PartitionStoreTransaction, Result, StorageAccess};

impl KeyCodec for VQueueParent {
Expand Down Expand Up @@ -356,6 +359,63 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> {
self.raw_put_cf(KeyKind::VQueueEntryState, key_buffer, value_buf);
}

fn put_item<E>(
&mut self,
qid: &VQueueId,
created_at: UniqueTimestamp,
kind: EntryKind,
id: &EntryId,
item: E,
) where
E: Message,
{
let key_buffer = self.cleared_key_buffer_mut(ItemsKey::serialized_length_fixed());

ItemsKey {
partition_key: qid.partition_key,
parent: qid.parent,
instance: qid.instance,
created_at,
kind,
id: *id,
}
.serialize_to(key_buffer);

let key = key_buffer.split();

let value_buffer = self.cleared_value_buffer_mut(item.encoded_len());

item.encode(value_buffer)
.expect("enough space to encode item");
let value = value_buffer.split();

self.raw_put_cf(KeyKind::VQueueItems, key, value);
}

fn delete_item(
&mut self,
qid: &VQueueId,
created_at: UniqueTimestamp,
kind: EntryKind,
id: &EntryId,
) {
let key_buffer = self.cleared_key_buffer_mut(ItemsKey::serialized_length_fixed());

ItemsKey {
partition_key: qid.partition_key,
parent: qid.parent,
instance: qid.instance,
created_at,
kind,
id: *id,
}
.serialize_to(key_buffer);

let key = key_buffer.split();

self.raw_delete_cf(KeyKind::VQueueItems, key);
}

// fn update_vqueue_entry_state(
// &mut self,
// at: UniqueTimestamp,
Expand Down Expand Up @@ -479,6 +539,37 @@ impl ReadVQueueTable for PartitionStoreTransaction<'_> {
Ok(Some(OwnedEntryState { header, state }))
}

async fn get_item<E>(
&mut self,
qid: &VQueueId,
created_at: UniqueTimestamp,
kind: EntryKind,
id: &EntryId,
) -> Result<Option<E>>
where
E: OwnedMessage,
{
let key_buffer = self.cleared_value_buffer_mut(ItemsKey::serialized_length_fixed());

ItemsKey {
partition_key: qid.partition_key,
parent: qid.parent,
instance: qid.instance,
created_at,
kind,
id: *id,
}
.serialize_to(key_buffer);

let key = key_buffer.split();

let Some(raw_value) = self.get(ItemsKey::TABLE, key)? else {
return Ok(None);
};

Ok(Some(E::decode(&mut raw_value.as_ref())?))
}

// async fn with_entry_state<'a, E, F, O>(
// &mut self,
// partition_key: PartitionKey,
Expand Down
32 changes: 32 additions & 0 deletions crates/storage-api/src/vqueue_table/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_types::clock::UniqueTimestamp;
use restate_types::identifiers::PartitionKey;
use restate_types::vqueue::VQueueId;

Expand Down Expand Up @@ -70,6 +71,26 @@ pub trait WriteVQueueTable {
) where
E: EntryStateKind + bilrost::Message + bilrost::encoding::RawMessage,
(): bilrost::encoding::EmptyState<(), E>;

/// Stores a vqueue item for later use.
fn put_item<E>(
&mut self,
qid: &VQueueId,
created_at: UniqueTimestamp,
kind: EntryKind,
id: &EntryId,
item: E,
) where
E: bilrost::Message;

/// Deletes a vqueue item.
fn delete_item(
&mut self,
qid: &VQueueId,
created_at: UniqueTimestamp,
kind: EntryKind,
id: &EntryId,
);
}

pub trait ReadVQueueTable {
Expand Down Expand Up @@ -102,6 +123,17 @@ pub trait ReadVQueueTable {
+ 'static,
(): bilrost::encoding::EmptyState<(), E>;

/// Gets a vqueue item identified by its qid, the entry id, its kind and the creation timestamp.
fn get_item<E>(
&mut self,
qid: &VQueueId,
created_at: UniqueTimestamp,
kind: EntryKind,
id: &EntryId,
) -> impl Future<Output = Result<Option<E>>>
where
E: bilrost::OwnedMessage;

// Commented for future reference
// fn with_entry_state<'a, E, F, O>(
// &mut self,
Expand Down
Loading