-
Notifications
You must be signed in to change notification settings - Fork 184
Description
Similarly, any I/O triggered by the state machine currently must complete inline in
apply(). Here, it might be also helpful to have some kind of callback to notify the caller about applied entries asynchronously.But, do we need to report applied log ID to the caller at all? Especially for a storage which works with snapshots, the applied log ID is actually a lie - it is applied to the in-memory state, but not to the persistent state. Only a snapshot will really apply it. We just need to account for applied log ID somewhere in the sense that the same entry is not applied twice. With that, there is no need for I/O accounting of individual
applycalls and the I/O can continue fully overlapped in the background even past the execution ofapply(). Solely log I/O must be awaited for correctness.In fact, in our implementation, the client response is a ZST, since we send the responses to the client already during
applyvia a channel directly to the client connection (reducing latency and allowing the client connection await the I/O, if it's necessary for some reason). This could be generalized, since we have a response receiver for the client abstracted out anyway - so instead of collecting results, sending them to another task and processing them there, we can also directly process them here (and send results to respectiveoneshotchannels in the default impl). This would also simplify outapplying_entriesand also our special implementation for replies.
Originally posted by @schreter in #1154 (review)
Allows apply() to return before the IO completion, and when the IO completes, call the callback Callback.completed() to inform RaftCore about the result. Callback.complete() should also be responsible to send back the result to client.
/// `apply()` use this Callback to inform the client and RaftCore.
struct Callback {
to_client: Option<Responder>,
to_raft_core: Option<mpsc::Sender>, // Optional because RaftCore does not need to be informed for every log.
}
impl Callback{
fn complete(self) {
self.to_client.map(|x| x.send(...));
self.to_raft_core.map(|x| x.send(...));
}
}
pub trait RaftStateMachineV2<C>
{
async fn apply<I>(&mut self, entries: I) -> Result<Vec<C::R>, StorageError<C>>
where
I: IntoIterator<Item = (C::Entry, Callback)> + OptionalSend,
I::IntoIter: OptionalSend;