Skip to content

Feature: Support separate on_commit and on_apply callbacks in Responder #1460

@drmingdrmer

Description

@drmingdrmer

Feature Request

Enable separate notifications for commit and apply phases by extending the responder mechanism to support both events.

Background

Currently, client_write_ff() only notifies the caller after the entry is applied to the state machine. This means:

  1. Entry is written to log and replicated
  2. Commit: Quorum reached (update_committed() at replication_handler/mod.rs:193)
  3. Apply: Entry applied to state machine (sm::Worker at worker.rs:238)
  4. Response sent: Only at this point (after apply)

However, many use cases need to distinguish between these two phases:

  • Commit: Entry is durable and guaranteed to be applied (quorum reached)
  • Apply: Entry has been processed by state machine (potentially slow)

Use Cases

  1. Low-latency acknowledgment: Acknowledge to client once committed (durable), before slow state machine processing
  2. Pipelined writes: Start next operation once committed, don't wait for apply
  3. Monitoring: Track both commit latency (network/replication) vs apply latency (state machine)
  4. Write-ahead patterns: Commit guarantees durability, apply provides result

Proposed Solutions

Alternative 1: Two Separate Responders

Split the responder parameter into two optional callbacks:

pub async fn client_write_ff(
    &self,
    app_data: C::D,
    on_commit: Option<CommitResponderOf<C>>,
    on_apply: Option<ApplyResponderOf<C>>,
) -> Result<(), Fatal<C>>

Implementation:

  • Store both responders in separate maps: commit_responders and apply_responders
  • Call on_commit when update_committed() is invoked (replication_handler/mod.rs:193)
  • Call on_apply when entry is applied (sm::worker.rs:238)

Pros:

  • Clear separation of concerns
  • Flexible: can have commit-only, apply-only, or both

Cons:

  • More parameters (but both optional)
  • Need two storage maps

Alternative 2: Extended Responder Trait (Recommended)

Extend the Responder trait with two methods, keeping send() for backward compatibility:

pub trait Responder<T>: OptionalSend + 'static {
    /// Called when log entry is committed (quorum reached).
    ///
    /// Default: no-op
    fn on_commit(self, log_id: LogIdOf<C>) {
        // Default: do nothing
    }

    /// Called when log entry is applied to state machine.
    ///
    /// Default: delegates to send() for backward compatibility
    fn on_apply(self, result: T) {
        self.send(result);
    }

    /// Legacy method: automatically delegates to on_apply().
    ///
    /// Deprecated: implement on_apply() instead
    #[deprecated(since = "0.11.0", note = "implement on_apply() instead")]
    fn send(self, result: T);
}

Implementation:

  1. When update_committed() is called, invoke responder.on_commit(log_id)
  2. When entry is applied, invoke responder.on_apply(result)
  3. Default on_apply() calls send() for backward compatibility

Example usage:

struct MyResponder {
    commit_tx: oneshot::Sender<LogId>,
    apply_tx: oneshot::Sender<Response>,
}

impl Responder<Response> for MyResponder {
    fn on_commit(self, log_id: LogId) {
        let _ = self.commit_tx.send(log_id);
    }

    fn on_apply(self, result: Response) {
        let _ = self.apply_tx.send(result);
    }

    fn send(self, result: Response) {
        // Legacy: just delegate to on_apply
        self.on_apply(result);
    }
}

Pros:

  • Backward compatible (existing responders still work)
  • Single trait extension
  • Flexible: responders can choose to implement one or both
  • Default implementations minimize boilerplate

Cons:

  • Requires trait to consume self twice (needs design consideration)

Alternative 2b: Separate Callback Methods

If self-consumption is problematic, use a reference-based approach:

pub trait Responder<T>: OptionalSend + 'static {
    /// Called when log entry is committed (quorum reached).
    fn on_commit(&mut self, log_id: LogIdOf<C>) {
        // Default: do nothing
    }

    /// Called when log entry is applied to state machine.
    fn on_apply(&mut self, result: T);

    /// Finalize and consume responder after all callbacks.
    fn finalize(self) {
        // Default: do nothing
    }
}

This allows multiple callbacks before consuming.

Recommendation

Alternative 2 is preferred because:

  1. Backward compatible
  2. Single parameter (cleaner API)
  3. Responders control their own behavior
  4. Works well with Feature: Make responder optional in client_write_ff #1458 (optional responder) and Feature: Avoid Vec allocation in RaftStateMachine::apply by using callbacks #1459 (callback-based apply)

Implementation Points

Commit callback location (replication_handler/mod.rs:193-196):

if let Some(_prev_committed) = self.state.update_committed(&granted) {
    // NEW: notify commit responders here
    self.notify_commit_responders(self.state.committed());

    self.output.push_command(Command::ReplicateCommitted {
        committed: self.state.committed().cloned(),
    });
}

Apply callback location (already exists at sm/worker.rs:229-238):

if let Some(tx) = tx {
    let res = Ok(ClientWriteResponse { log_id, data: resp, membership });
    tx.on_apply(res);  // Changed from tx.send(res)
}

Related Issues

Together, these three issues enable:

  • Zero-allocation writes when response not needed
  • Separate commit/apply notifications
  • Streaming responses from state machine

cc @schreter - Does this align with your performance and flexibility requirements?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions