Skip to content

Commit 1a8c5ad

Browse files
committed
Set InvocationStatus correctly when resuming invocation with vqueues
When resuming an invocation using the vqueue scheduler we cannot go directly into the InvocationStatus::Invoked since we first need to get scheduled again. As part of the scheduling the system needs to go through a few state changes that depend on the InvocationStatus. Hence, this commit sets the InvocationStatus to Inboxed when moving an item back into the inbox stage.
1 parent 29a1409 commit 1a8c5ad

File tree

5 files changed

+126
-43
lines changed

5 files changed

+126
-43
lines changed

crates/storage-api/src/invocation_status_table/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,30 @@ impl PreFlightInvocationMetadata {
539539
}
540540
}
541541

542+
pub fn from_in_flight_invocation_metadata(
543+
in_flight_invocation_metadata: InFlightInvocationMetadata,
544+
) -> Self {
545+
Self {
546+
response_sinks: in_flight_invocation_metadata.response_sinks,
547+
timestamps: in_flight_invocation_metadata.timestamps,
548+
invocation_target: in_flight_invocation_metadata.invocation_target,
549+
source: in_flight_invocation_metadata.source,
550+
execution_time: in_flight_invocation_metadata.execution_time,
551+
completion_retention_duration: in_flight_invocation_metadata
552+
.completion_retention_duration,
553+
journal_retention_duration: in_flight_invocation_metadata.journal_retention_duration,
554+
idempotency_key: in_flight_invocation_metadata.idempotency_key,
555+
created_using_restate_version: in_flight_invocation_metadata
556+
.created_using_restate_version,
557+
random_seed: in_flight_invocation_metadata.random_seed,
558+
// we must have created the journal when coming from an InFlightInvocationMetadata
559+
input: PreFlightInvocationArgument::Journal(PreFlightInvocationJournal {
560+
journal_metadata: in_flight_invocation_metadata.journal_metadata,
561+
pinned_deployment: in_flight_invocation_metadata.pinned_deployment,
562+
}),
563+
}
564+
}
565+
542566
pub fn span_context(&self) -> &ServiceInvocationSpanContext {
543567
self.input.span_context()
544568
}
@@ -579,6 +603,20 @@ impl InboxedInvocation {
579603
timestamp,
580604
)
581605
}
606+
607+
/// Important: This conversion should only be used by the new vqueue based scheduler
608+
pub fn from_in_flight_invocation_metadata(
609+
in_flight_invocation_metadata: InFlightInvocationMetadata,
610+
) -> Self {
611+
Self {
612+
// The new vqueue based scheduler no longer uses the old inboxes. Hence, we can put an
613+
// arbitrary value here.
614+
inbox_sequence_number: 1,
615+
metadata: PreFlightInvocationMetadata::from_in_flight_invocation_metadata(
616+
in_flight_invocation_metadata,
617+
),
618+
}
619+
}
582620
}
583621

584622
/// This map is used to record trim points and determine whether a completion from an old epoch should be accepted or rejected.

crates/worker/src/partition/state_machine/lifecycle/paused.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use restate_storage_api::invocation_status_table::{
1515
InvocationStatus, ReadInvocationStatusTable, WriteInvocationStatusTable,
1616
};
1717
use restate_storage_api::journal_events::WriteJournalEventsTable;
18+
use restate_storage_api::service_status_table::WriteVirtualObjectStatusTable;
1819
use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable};
1920
use restate_types::config::Configuration;
2021
use restate_types::identifiers::InvocationId;
@@ -32,7 +33,8 @@ where
3233
+ WriteInvocationStatusTable
3334
+ WriteJournalEventsTable
3435
+ WriteVQueueTable
35-
+ ReadVQueueTable,
36+
+ ReadVQueueTable
37+
+ WriteVirtualObjectStatusTable,
3638
{
3739
async fn apply(self, ctx: &'ctx mut StateMachineApplyContext<'s, S>) -> Result<(), Error> {
3840
let OnPausedCommand {

crates/worker/src/partition/state_machine/lifecycle/resume.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
// by the Apache License, Version 2.0.
1010

1111
use restate_invoker_api::InvokeInputJournal;
12-
use restate_storage_api::invocation_status_table::InvocationStatus;
12+
use restate_storage_api::invocation_status_table::{InboxedInvocation, InvocationStatus};
1313
use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable};
1414
use restate_types::config::Configuration;
1515
use restate_types::identifiers::InvocationId;
@@ -45,16 +45,22 @@ where
4545
if Configuration::pinned().common.experimental_enable_vqueues {
4646
ctx.vqueue_move_invocation_to_inbox_stage(&self.invocation_id)
4747
.await?;
48+
// When moving an invocation back into the inbox stage, we have to update the
49+
// InvocationStatus accordingly. Otherwise, we miss running changes on the VQueues
50+
// inbox.
51+
*self.invocation_status = InvocationStatus::Inboxed(
52+
InboxedInvocation::from_in_flight_invocation_metadata(metadata.clone()),
53+
);
4854
} else {
4955
ctx.action_collector.push(Action::Invoke {
5056
invocation_id: self.invocation_id,
5157
invocation_epoch: current_invocation_epoch,
5258
invocation_target,
5359
invoke_input_journal: InvokeInputJournal::NoCachedJournal,
5460
});
55-
}
5661

57-
*self.invocation_status = InvocationStatus::Invoked(metadata.clone());
62+
*self.invocation_status = InvocationStatus::Invoked(metadata.clone());
63+
}
5864

5965
Ok(())
6066
}

crates/worker/src/partition/state_machine/lifecycle/suspend.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use crate::partition::state_machine::{CommandHandler, Error, ParkCause, StateMachineApplyContext};
1212
use restate_storage_api::invocation_status_table::{InvocationStatus, WriteInvocationStatusTable};
1313
use restate_storage_api::journal_table_v2::ReadJournalTable;
14+
use restate_storage_api::service_status_table::WriteVirtualObjectStatusTable;
1415
use restate_storage_api::vqueue_table::{ReadVQueueTable, WriteVQueueTable};
1516
use restate_types::config::Configuration;
1617
use restate_types::identifiers::InvocationId;
@@ -27,7 +28,11 @@ pub struct OnSuspendCommand {
2728
impl<'ctx, 's: 'ctx, S> CommandHandler<&'ctx mut StateMachineApplyContext<'s, S>>
2829
for OnSuspendCommand
2930
where
30-
S: ReadJournalTable + WriteInvocationStatusTable + WriteVQueueTable + ReadVQueueTable,
31+
S: ReadJournalTable
32+
+ WriteInvocationStatusTable
33+
+ WriteVQueueTable
34+
+ ReadVQueueTable
35+
+ WriteVirtualObjectStatusTable,
3136
{
3237
async fn apply(self, ctx: &'ctx mut StateMachineApplyContext<'s, S>) -> Result<(), Error> {
3338
debug_assert!(

crates/worker/src/partition/state_machine/mod.rs

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,17 +1177,6 @@ impl<S> StateMachineApplyContext<'_, S> {
11771177
where
11781178
S: WriteJournalTable + WriteInvocationStatusTable + WriteVQueueTable + ReadVQueueTable,
11791179
{
1180-
// Usage metering for "actions" should include the Input journal entry
1181-
// type, but it gets filtered out before reaching the state machine.
1182-
// Therefore we count it here, as a special case.
1183-
if self.is_leader {
1184-
counter!(
1185-
USAGE_LEADER_JOURNAL_ENTRY_COUNT,
1186-
"entry" => "Command/Input",
1187-
)
1188-
.increment(1);
1189-
}
1190-
11911180
let invoke_input_journal = if let Some(invocation_input) = invocation_input {
11921181
self.init_journal(
11931182
invocation_id,
@@ -1255,6 +1244,17 @@ impl<S> StateMachineApplyContext<'_, S> {
12551244
where
12561245
S: WriteJournalTable,
12571246
{
1247+
// Usage metering for "actions" should include the Input journal entry
1248+
// type, but it gets filtered out before reaching the state machine.
1249+
// Therefore we count it here, as a special case.
1250+
if self.is_leader {
1251+
counter!(
1252+
USAGE_LEADER_JOURNAL_ENTRY_COUNT,
1253+
"entry" => "Command/Input",
1254+
)
1255+
.increment(1);
1256+
}
1257+
12581258
debug_if_leader!(self.is_leader, "Init journal with input entry");
12591259

12601260
// In our current data model, ServiceInvocation has always an input, so initial length is 1
@@ -2866,27 +2866,31 @@ impl<S> StateMachineApplyContext<'_, S> {
28662866
invocation_target.invocation_target_ty(),
28672867
InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive)
28682868
) {
2869-
let keyed_service_id = invocation_target.as_keyed_service_id().expect(
2870-
"When the handler type is Exclusive, the invocation target must have a key",
2871-
);
2872-
match self
2873-
.storage
2874-
.get_virtual_object_status(&keyed_service_id)
2875-
.await?
2876-
{
2877-
VirtualObjectStatus::Locked(iid) => {
2878-
panic!(
2879-
"invariant violated trying to run an invocation {invocation_id} on a VO while another invocation {iid} is holding the lock"
2880-
);
2881-
}
2882-
VirtualObjectStatus::Unlocked => {
2883-
// Lock the service
2884-
self.storage
2885-
.put_virtual_object_status(
2886-
&keyed_service_id,
2887-
&VirtualObjectStatus::Locked(invocation_id),
2888-
)
2889-
.map_err(Error::Storage)?;
2869+
// We might have already locked the service before if we are resuming, for example.
2870+
// Hence, only lock it if we are not holding a token yet.
2871+
if !card.priority.token_held() {
2872+
let keyed_service_id = invocation_target.as_keyed_service_id().expect(
2873+
"When the handler type is Exclusive, the invocation target must have a key",
2874+
);
2875+
match self
2876+
.storage
2877+
.get_virtual_object_status(&keyed_service_id)
2878+
.await?
2879+
{
2880+
VirtualObjectStatus::Locked(iid) => {
2881+
panic!(
2882+
"invariant violated trying to run an invocation {invocation_id} on a VO while another invocation {iid} is holding the lock"
2883+
);
2884+
}
2885+
VirtualObjectStatus::Unlocked => {
2886+
// Lock the service
2887+
self.storage
2888+
.put_virtual_object_status(
2889+
&keyed_service_id,
2890+
&VirtualObjectStatus::Locked(invocation_id),
2891+
)
2892+
.map_err(Error::Storage)?;
2893+
}
28902894
}
28912895
}
28922896
}
@@ -4385,15 +4389,25 @@ impl<S> StateMachineApplyContext<'_, S> {
43854389
let current_invocation_epoch = metadata.current_invocation_epoch;
43864390

43874391
metadata.timestamps.update(self.record_created_at);
4388-
let invocation_target = metadata.invocation_target.clone();
4389-
self.storage
4390-
.put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata))
4391-
.map_err(Error::Storage)?;
43924392

43934393
if Configuration::pinned().common.experimental_enable_vqueues {
43944394
self.vqueue_move_invocation_to_inbox_stage(&invocation_id)
43954395
.await?;
4396+
self.storage
4397+
.put_invocation_status(
4398+
&invocation_id,
4399+
&InvocationStatus::Inboxed(
4400+
InboxedInvocation::from_in_flight_invocation_metadata(metadata),
4401+
),
4402+
)
4403+
.map_err(Error::Storage)?;
43964404
} else {
4405+
let invocation_target = metadata.invocation_target.clone();
4406+
4407+
self.storage
4408+
.put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata))
4409+
.map_err(Error::Storage)?;
4410+
43974411
self.action_collector.push(Action::Invoke {
43984412
invocation_id,
43994413
invocation_epoch: current_invocation_epoch,
@@ -4422,7 +4436,10 @@ impl<S> StateMachineApplyContext<'_, S> {
44224436
waiting_for_completed_entries: HashSet<EntryIndex>,
44234437
) -> Result<(), Error>
44244438
where
4425-
S: WriteInvocationStatusTable + WriteVQueueTable + ReadVQueueTable,
4439+
S: WriteInvocationStatusTable
4440+
+ WriteVQueueTable
4441+
+ ReadVQueueTable
4442+
+ WriteVirtualObjectStatusTable,
44264443
{
44274444
debug_if_leader!(
44284445
self.is_leader,
@@ -5083,7 +5100,7 @@ impl<S> StateMachineApplyContext<'_, S> {
50835100
cause: ParkCause,
50845101
) -> Result<(), Error>
50855102
where
5086-
S: WriteVQueueTable + ReadVQueueTable,
5103+
S: WriteVQueueTable + ReadVQueueTable + WriteVirtualObjectStatusTable,
50875104
{
50885105
let qid = Self::vqueue_id_from_invocation(invocation_id, invocation_target);
50895106

@@ -5145,6 +5162,21 @@ impl<S> StateMachineApplyContext<'_, S> {
51455162
)
51465163
.await?;
51475164

5165+
// If we release the concurrency token for exclusive VO handlers, then we also need to
5166+
// unlock it so that other invocations can make progress.
5167+
if should_release_concurrency_token
5168+
&& invocation_target.invocation_target_ty()
5169+
== InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive)
5170+
{
5171+
let keyed_service_id = invocation_target.as_keyed_service_id().expect(
5172+
"When the handler type is Exclusive, the invocation target must have a key",
5173+
);
5174+
// We consumed the inbox, nothing else to do here
5175+
self.storage
5176+
.put_virtual_object_status(&keyed_service_id, &VirtualObjectStatus::Unlocked)
5177+
.map_err(Error::Storage)?;
5178+
}
5179+
51485180
Ok(())
51495181
}
51505182

0 commit comments

Comments
 (0)