Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/partition-store/src/vqueue_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> {
self.raw_delete_cf(KeyKind::VQueueInbox, key_buffer)
}

fn mark_queue_as_active(&mut self, qid: &restate_types::vqueue::VQueueId) {
fn mark_vqueue_as_active(&mut self, qid: &restate_types::vqueue::VQueueId) {
let mut key_buffer = [0u8; ActiveKey::serialized_length_fixed()];
ActiveKey {
partition_key: qid.partition_key,
Expand All @@ -306,7 +306,7 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> {
self.raw_put_cf(KeyKind::VQueueActive, key_buffer, []);
}

fn mark_queue_as_empty(&mut self, qid: &restate_types::vqueue::VQueueId) {
fn mark_vqueue_as_dormant(&mut self, qid: &restate_types::vqueue::VQueueId) {
let mut key_buffer = [0u8; ActiveKey::serialized_length_fixed()];
ActiveKey {
partition_key: qid.partition_key,
Expand Down
27 changes: 25 additions & 2 deletions crates/storage-api/src/vqueue_table/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
// by the Apache License, Version 2.0.

use bytes::{Buf, BufMut};

use restate_types::clock::UniqueTimestamp;
use restate_types::identifiers::InvocationId;
use restate_types::logs::Lsn;
use restate_types::state_mut::ExternalStateMutation;
use restate_types::vqueue::{
EffectivePriority, NewEntryPriority, VQueueId, VQueueInstance, VQueueParent,
};
use std::fmt::{Debug, Formatter};

use crate::StorageError;

Expand All @@ -35,7 +37,7 @@ pub enum EntryKind {

// Using u128 would have added an extra unnecessary 8 bytes due to alignment
// requirements (u128 is 0x10 aligned and it forces the struct to be 0x10 aligned)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntryId([u8; 16]);

impl EntryId {
Expand All @@ -60,6 +62,15 @@ impl EntryId {
}
}

impl Debug for EntryId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// display inner field as u128 to make it a bit easier to read
f.debug_tuple("EntryId")
.field(&u128::from_be_bytes(self.0))
.finish()
}
}

impl From<&InvocationId> for EntryId {
#[inline]
fn from(id: &InvocationId) -> Self {
Expand All @@ -74,6 +85,14 @@ impl From<InvocationId> for EntryId {
}
}

impl From<Lsn> for EntryId {
#[inline]
fn from(lsn: Lsn) -> Self {
// big endian because we want messages with higher message indices to appear later
Self::from_bytes(u128::from(lsn.as_u64()).to_be_bytes())
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntryCard {
pub priority: EffectivePriority,
Expand Down Expand Up @@ -185,3 +204,7 @@ pub trait EntryStateKind: Send {
impl EntryStateKind for () {
const KIND: EntryKind = EntryKind::Unknown;
}

impl EntryStateKind for ExternalStateMutation {
const KIND: EntryKind = EntryKind::StateMutation;
}
49 changes: 24 additions & 25 deletions crates/storage-api/src/vqueue_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,7 @@ use smallvec::SmallVec;
use restate_types::clock::UniqueTimestamp;
use restate_types::vqueue::EffectivePriority;

use super::VisibleAt;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VQueueStatus {
/// Enabled (not-paused) and has items to process
Active,
/// Regardless whether it's paused or not, it's empty (nothing to process)
Empty,
/// Paused indicates it's non-empty but paused (should not process its items)
Paused,
}
use super::{Stage, VisibleAt};

#[derive(Debug, Default, Clone, bilrost::Message)]
pub struct VQueueStatistics {
Expand Down Expand Up @@ -99,14 +89,16 @@ impl VQueueMeta {
self.num_waiting.iter().sum()
}

pub fn status(&self) -> VQueueStatus {
if self.is_empty() {
VQueueStatus::Empty
} else if self.is_paused() {
VQueueStatus::Paused
} else {
VQueueStatus::Active
}
/// A vqueue is considered active when it's of interest to the scheduler.
///
/// The scheduler cares about vqueues that have entries that are already running or that are waiting
/// to run. With some special rules to consider when the queue is paused. When the vqueue is
/// paused, the scheduler will only be interested in its "running" entries and not in its
/// waiting entries. Therefore, it will remain to be "active" as long as it has running
/// entries. Once running entries are moved to waiting or completed, the vqueue is be
/// considered dormant until it's unpaused.
pub fn is_active(&self) -> bool {
self.stats.num_running > 0 || (self.total_waiting() > 0 && !self.is_paused())
}

pub fn num_waiting(&self, priority: EffectivePriority) -> u32 {
Expand Down Expand Up @@ -225,17 +217,24 @@ impl VQueueMeta {
self.add_to_waiting(priority);
}
Action::Complete {
was_waiting,
previous_stage,
priority,
} => {
debug_assert!(self.length > 0);
self.length -= 1;
self.stats.last_completion_at = Some(now);
if was_waiting {
self.remove_from_waiting(priority);
} else {
self.stats.decrement_running();

match previous_stage {
Stage::Unknown => {
panic!("Something is wrong. We shouldn't be in the unknown stage.")
}
Stage::Inbox => self.remove_from_waiting(priority),
Stage::Run => self.stats.decrement_running(),
Stage::Park => {
// nothing to do since we were neither waiting nor running
}
}

if priority.token_held() {
self.release_token();
}
Expand Down Expand Up @@ -317,7 +316,7 @@ pub enum Action {
Complete {
// Must be the latest priority assigned to the entry (effective priority)
priority: EffectivePriority,
was_waiting: bool,
previous_stage: Stage,
},
}

Expand Down
21 changes: 19 additions & 2 deletions crates/storage-api/src/vqueue_table/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,27 @@ pub trait WriteVQueueTable {
fn delete_inbox_entry(&mut self, qid: &VQueueId, stage: Stage, card: &EntryCard);

/// Adds a vqueue to the list of active vqueues
fn mark_queue_as_active(&mut self, qid: &VQueueId);
///
/// A vqueue is considered active when it's of interest to the scheduler.
///
/// The scheduler cares about vqueues that have entries that are already running or that are waiting
/// to run. With some special rules to consider when the queue is paused. When the vqueue is
/// paused, the scheduler will only be interested in its "running" entries and not in its
/// waiting entries. Therefore, it will remain to be "active" as long as it has running
/// entries. Once running entries are moved to waiting or completed, the vqueue is be
/// considered dormant until it's unpaused.
///
/// A vqueue that is "not" active does not mean it's "empty". It could be paused or
/// only contains entries in parked or completed states. As such, it's not considered
/// by the scheduler and it's considered "dormant".
fn mark_vqueue_as_active(&mut self, qid: &VQueueId);

/// Removes the vqueue from the list of active vqueues
fn mark_queue_as_empty(&mut self, qid: &VQueueId);
///
/// A dormant vqueue is not necessarily `empty`. It's a vqueue _might_ have items (or not)
/// in parked or completed states, or it might have waiting items in its inbox but the
/// vqueue is paused and would not be visible to the scheduler.
fn mark_vqueue_as_dormant(&mut self, qid: &VQueueId);

/// Updates a vqueue's entry's state
fn put_vqueue_entry_state<E>(
Expand Down
14 changes: 13 additions & 1 deletion crates/types/src/clock/unique_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::fmt::{Debug, Formatter};
use std::num::NonZeroU64;
use std::time::SystemTime;

Expand Down Expand Up @@ -50,7 +51,7 @@ pub enum Error {
/// The timestamp is represented as a 64-bit unsigned integer. The upper 42 bits
/// represent the physical time in milliseconds since [`RESTATE_EPOCH`], and the
/// lower 22 bits represent the logical clock count.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(transparent)]
pub struct UniqueTimestamp(NonZeroU64);

Expand Down Expand Up @@ -132,6 +133,17 @@ impl UniqueTimestamp {
}
}

impl Debug for UniqueTimestamp {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// Output unique timestamp as a pair of physical and logical timestamps. Convert the physical
// timestamp from the RESTATE_EPOCH to the UNIX_EPOCH.
f.debug_struct("UniqueTimestamp")
.field("physical", &self.to_unix_millis().as_u64())
.field("logical", &self.logical_raw())
.finish()
}
}

impl From<SystemTime> for UniqueTimestamp {
fn from(value: SystemTime) -> Self {
// The assumption is that SystemTime will always be > RESTATE_EPOCH
Expand Down
14 changes: 13 additions & 1 deletion crates/types/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,27 @@ impl From<InvocationUuid> for opentelemetry::trace::SpanId {
/// Services are isolated by key. This means that there cannot be two concurrent
/// invocations for the same service instance (service name, key).
#[derive(
Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Serialize, serde::Deserialize,
Eq,
Hash,
PartialEq,
PartialOrd,
Ord,
Clone,
Debug,
serde::Serialize,
serde::Deserialize,
bilrost::Message,
)]
pub struct ServiceId {
// TODO rename this to KeyedServiceId. This type can be used only by keyed service types (virtual objects and workflows)
/// Identifies the grpc service
#[bilrost(1)]
pub service_name: ByteString,
/// Identifies the service instance for the given service name
#[bilrost(2)]
pub key: ByteString,

#[bilrost(3)]
partition_key: PartitionKey,
}

Expand Down
5 changes: 4 additions & 1 deletion crates/types/src/state_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ use crate::identifiers::ServiceId;
/// ExternalStateMutation
///
/// represents an external request to mutate a user's state.
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Message)]
pub struct ExternalStateMutation {
#[bilrost(1)]
pub service_id: ServiceId,
#[bilrost(2)]
pub version: Option<String>,
// flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs
#[serde_as(as = "serde_with::Seq<(_, _)>")]
#[bilrost(3)]
pub state: HashMap<Bytes, Bytes>,
}

Expand Down
6 changes: 5 additions & 1 deletion crates/types/src/vqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ impl VQueueInstance {
}

#[inline]
pub fn infer_from(key: impl AsRef<[u8]>) -> Self {
pub fn infer_from(name: impl AsRef<[u8]>, key: impl AsRef<[u8]>) -> Self {
// todo consider using the same hasher we use for partition key (xxh3)
// Important to never change the seed!
let mut hasher = rustc_hash::FxHasher::with_seed(67);

name.as_ref().hash(&mut hasher);
// separator
b'/'.hash(&mut hasher);
key.as_ref().hash(&mut hasher);

let hash = hasher.finish();
// XOR upper and lower bits for better collision resistance. xxh3 might give better
// collision tolerance when generating an u32 hash, but we assume that the partition key spreads
Expand Down
1 change: 1 addition & 0 deletions crates/vqueues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bilrost = { workspace = true, features = ["smallvec"] }
derive_more = { workspace = true }
futures = { workspace = true }
hashbrown = { version = "0.16" }
metrics = { workspace = true }
pin-project = { workspace = true }
rocksdb = { workspace = true }
smallvec = { workspace = true }
Expand Down
21 changes: 10 additions & 11 deletions crates/vqueues/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use hashbrown::{HashMap, hash_map};
use tracing::debug;

use restate_storage_api::StorageError;
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates, VQueueStatus};
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates};
use restate_storage_api::vqueue_table::{ReadVQueueTable, ScanVQueueTable};
use restate_types::vqueue::VQueueId;

Expand Down Expand Up @@ -41,10 +41,6 @@ impl<'a> VQueuesMeta<'a> {
Self { inner: cache }
}

pub fn num_active_vqueues(&self) -> usize {
self.inner.queues.len()
}

pub(crate) fn config_pool(&'a self) -> &'a ConfigPool {
&self.inner.config
}
Expand All @@ -53,8 +49,11 @@ impl<'a> VQueuesMeta<'a> {
self.inner.queues.get(qid)
}

pub fn iter_vqueues(&'a self) -> impl ExactSizeIterator<Item = (&'a VQueueId, &'a VQueueMeta)> {
self.inner.queues.iter()
pub fn iter_active_vqueues(&'a self) -> impl Iterator<Item = (&'a VQueueId, &'a VQueueMeta)> {
self.inner
.queues
.iter()
.filter(|(_, meta)| meta.is_active())
}

pub fn report(&self) {
Expand Down Expand Up @@ -135,18 +134,18 @@ impl VQueuesMetaMut {
}
}

/// Returns the status of the vqueue before and after all the updates
/// Returns is_active of the vqueue before and after all the updates
/// in the form of a tuple (before, after).
pub(crate) async fn apply_updates<S: ReadVQueueTable>(
&mut self,
storage: &mut S,
qid: &VQueueId,
updates: &VQueueMetaUpdates,
) -> Result<(VQueueStatus, VQueueStatus)> {
) -> Result<(bool, bool)> {
let vqueue = self.load(storage, qid).await?;
let before = vqueue.status();
let before = vqueue.is_active();
vqueue.apply_updates(updates)?;
let after = vqueue.status();
let after = vqueue.is_active();

// todo(asoli): Add compaction logic to remove empty vqueues that has been empty for a while
// only if the cache hits a certain threshold.
Expand Down
Loading
Loading