Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct ConnectionOptions {
#[builder(default = "temporal-rust".to_owned())]
#[cfg_attr(feature = "core-based-sdk", builder(setters(vis = "pub")))]
client_name: String,
// TODO [rust-sdk-branch]: SDK should set this to its version. Doing that probably easiest
// after adding proper client interceptors.
/// The version of the SDK being implemented on top of core. Is set as `client-version` header
/// in all RPC calls. The server decides if the client is supported based on this.
#[builder(default = VERSION.to_owned())]
Expand Down
1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow = "1.0"
async-trait = "0.1"
base64 = "0.22"
bon = { workspace = true }
crc32fast = "1"
dirs = { version = "6.0", optional = true }
derive_more = { workspace = true }
erased-serde = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/activity_definition.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably this should live alongside the rest of the activity code IMO (i.e. in temporal_sdk::activities). Same for workflow definition (and workflow signal definition, workflow update definition, and workflow query definition).

But the more I think about it, the more I think we have misnamed some things. I think it might clear it up if the "sdk" crate were actually named the "worker" crate or something. Arguably an SDK is made up of "runtime", "client", "worker", "converter", etc. If some of those are in common that can make sense, but to have an "SDK" crate not include most of what makes an SDK is a bit confusing IMO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can see naming it worker. I don't think it maters all that much but it's a reasonable rename.

The definition makes sense in common because workflow definitions will have to live here so they can be used by the client. Having the kinds of definitions together makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the definitions being in here is what triggered my thoughts here. It makes sense to me that you need it in common for use by a client, all of which make up the SDK IMO and only the worker crate is needed by implementers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I think I'd rather keep in named SDK and just re-export everything you might need, which I was intending to do anyway. But that can be done later on in the project when things are more settled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll reserve comment until then

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::data_converters::{TemporalDeserializable, TemporalSerializable};
/// Implement on a marker struct to define an activity.
pub trait ActivityDefinition {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider making activity definition have an invoke method that can be invoked and allow name to accept self. In (some) other languages, activity definitions can be instantiated by a user, and the decorators/attributes/etc are just sugar for creating them (and so they can be read by a user too). Arguably for those performing untyped activity invocations, they can instantiate a struct that implements this and just provide the name (and the input/output types if they'd like).

Copy link
Member Author

@Sushisource Sushisource Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's already the case here. You can just make a struct and implement activity definition. Or, you can use the macros on a function whose body is just unimplemented!() which is cool

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would definitely be interested in a sample/snippet showing how to impl and register activity definition. May be worth a helper struct/fn, though hopefully people don't use that approach to create activities much. I will say one place where we see this approach used a lot in .NET is for DI where they want to control lifetimes and inject deps and such to the invocation.

/// Type of the input argument to the workflow
type Input: TemporalDeserializable + 'static;
type Input: TemporalDeserializable + TemporalSerializable + 'static;
/// Type of the output of the workflow
type Output: TemporalSerializable + 'static;

Expand Down
57 changes: 38 additions & 19 deletions crates/common/src/data_converters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,28 @@ impl PayloadConverter {
// TODO [rust-sdk-branch]: Proto binary, other standard built-ins
}

#[derive(Debug)]
pub enum PayloadConversionError {
WrongEncoding,
EncodingError(Box<dyn std::error::Error>),
EncodingError(Box<dyn std::error::Error + Send + Sync>),
}

impl std::fmt::Display for PayloadConversionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PayloadConversionError::WrongEncoding => write!(f, "Wrong encoding"),
PayloadConversionError::EncodingError(err) => write!(f, "Encoding error: {}", err),
}
}
}

impl std::error::Error for PayloadConversionError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
PayloadConversionError::WrongEncoding => None,
PayloadConversionError::EncodingError(err) => Some(err.as_ref()),
}
}
}

pub trait FailureConverter {
Expand All @@ -71,13 +90,13 @@ pub struct DefaultFailureConverter;
pub trait PayloadCodec {
fn encode(
&self,
payloads: Vec<Payload>,
context: &SerializationContext,
payloads: Vec<Payload>,
) -> BoxFuture<'static, Vec<Payload>>;
fn decode(
&self,
payloads: Vec<Payload>,
context: &SerializationContext,
payloads: Vec<Payload>,
) -> BoxFuture<'static, Vec<Payload>>;
}
pub struct DefaultPayloadCodec;
Expand All @@ -94,16 +113,16 @@ pub trait TemporalSerializable {
None
}
}
///

/// Indicates some type can be deserialized for use with Temporal.
///
/// You don't need to implement this unless you are using a non-serde-compatible custom converter,
/// in which case you should implement the to/from_payload functions on some wrapper type.
pub trait TemporalDeserializable: Sized {
fn from_serde(
_: &dyn ErasedSerdePayloadConverter,
_: Payload,
_: &SerializationContext,
_: Payload,
) -> Option<Self> {
None
}
Expand All @@ -122,26 +141,26 @@ pub struct RawValue {
pub trait GenericPayloadConverter {
fn to_payload<T: TemporalSerializable + 'static>(
&self,
val: &T,
context: &SerializationContext,
val: &T,
) -> Result<Payload, PayloadConversionError>;
#[allow(clippy::wrong_self_convention)]
fn from_payload<T: TemporalDeserializable + 'static>(
&self,
payload: Payload,
context: &SerializationContext,
payload: Payload,
) -> Result<T, PayloadConversionError>;
}

impl GenericPayloadConverter for PayloadConverter {
fn to_payload<T: TemporalSerializable + 'static>(
&self,
val: &T,
context: &SerializationContext,
val: &T,
) -> Result<Payload, PayloadConversionError> {
match self {
PayloadConverter::Serde(pc) => {
Ok(pc.to_payload(val.as_serde().ok_or_else(|| todo!())?, context)?)
Ok(pc.to_payload(context, val.as_serde().ok_or_else(|| todo!())?)?)
}
PayloadConverter::UseWrappers => {
Ok(T::to_payload(val, context).ok_or_else(|| todo!())?)
Expand All @@ -153,12 +172,12 @@ impl GenericPayloadConverter for PayloadConverter {

fn from_payload<T: TemporalDeserializable + 'static>(
&self,
payload: Payload,
context: &SerializationContext,
payload: Payload,
) -> Result<T, PayloadConversionError> {
match self {
PayloadConverter::Serde(pc) => {
Ok(T::from_serde(pc.as_ref(), payload, context).ok_or_else(|| todo!())?)
Ok(T::from_serde(pc.as_ref(), context, payload).ok_or_else(|| todo!())?)
}
PayloadConverter::UseWrappers => {
Ok(T::from_payload(payload, context).ok_or_else(|| todo!())?)
Expand All @@ -183,22 +202,22 @@ where
{
fn from_serde(
pc: &dyn ErasedSerdePayloadConverter,
payload: Payload,
context: &SerializationContext,
payload: Payload,
) -> Option<Self>
where
Self: Sized,
{
erased_serde::deserialize(&mut pc.from_payload(payload, context).ok()?).ok()
erased_serde::deserialize(&mut pc.from_payload(context, payload).ok()?).ok()
}
}

struct SerdeJsonPayloadConverter;
impl ErasedSerdePayloadConverter for SerdeJsonPayloadConverter {
fn to_payload(
&self,
_: &SerializationContext,
value: &dyn erased_serde::Serialize,
_context: &SerializationContext,
) -> Result<Payload, PayloadConversionError> {
let as_json = serde_json::to_vec(value).map_err(|_| todo!())?;
Ok(Payload {
Expand All @@ -213,8 +232,8 @@ impl ErasedSerdePayloadConverter for SerdeJsonPayloadConverter {

fn from_payload(
&self,
_: &SerializationContext,
payload: Payload,
_context: &SerializationContext,
) -> Result<Box<dyn erased_serde::Deserializer<'static>>, PayloadConversionError> {
// TODO: Would check metadata
let json_v: serde_json::Value =
Expand All @@ -225,14 +244,14 @@ impl ErasedSerdePayloadConverter for SerdeJsonPayloadConverter {
pub trait ErasedSerdePayloadConverter: Send + Sync {
fn to_payload(
&self,
value: &dyn erased_serde::Serialize,
context: &SerializationContext,
value: &dyn erased_serde::Serialize,
) -> Result<Payload, PayloadConversionError>;
#[allow(clippy::wrong_self_convention)]
fn from_payload(
&self,
payload: Payload,
context: &SerializationContext,
payload: Payload,
) -> Result<Box<dyn erased_serde::Deserializer<'static>>, PayloadConversionError>;
}

Expand Down Expand Up @@ -299,15 +318,15 @@ impl FailureConverter for DefaultFailureConverter {
impl PayloadCodec for DefaultPayloadCodec {
fn encode(
&self,
payloads: Vec<Payload>,
_: &SerializationContext,
payloads: Vec<Payload>,
) -> BoxFuture<'static, Vec<Payload>> {
async move { payloads }.boxed()
}
fn decode(
&self,
payloads: Vec<Payload>,
_: &SerializationContext,
payloads: Vec<Payload>,
) -> BoxFuture<'static, Vec<Payload>> {
async move { payloads }.boxed()
}
Expand Down
50 changes: 49 additions & 1 deletion crates/common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
//! with workers.

use crate::protos::{coresdk, temporal, temporal::api::enums::v1::VersioningBehavior};
use std::str::FromStr;
use std::{
fs::File,
io::{self, BufReader, Read},
str::FromStr,
sync::OnceLock,
};

/// Specifies which task types a worker will poll for.
///
Expand Down Expand Up @@ -84,6 +89,19 @@ pub struct WorkerDeploymentOptions {
pub default_versioning_behavior: Option<VersioningBehavior>,
}

impl WorkerDeploymentOptions {
pub fn from_build_id(build_id: String) -> Self {
Self {
version: WorkerDeploymentVersion {
deployment_name: "".to_owned(),
build_id,
},
use_worker_versioning: false,
default_versioning_behavior: None,
}
}
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct WorkerDeploymentVersion {
/// Name of the deployment
Expand Down Expand Up @@ -138,3 +156,33 @@ impl From<temporal::api::deployment::v1::WorkerDeploymentVersion> for WorkerDepl
}
}
}

static CACHED_BUILD_ID: OnceLock<String> = OnceLock::new();

/// Build ID derived from hashing the on-disk bytes of the current executable.
/// Deterministic across machines for the same binary. Cached per-process.
pub fn build_id_from_current_exe() -> &'static str {
CACHED_BUILD_ID
.get_or_init(|| compute_crc32_exe_id().unwrap_or_else(|_| "undetermined".to_owned()))
}

fn compute_crc32_exe_id() -> io::Result<String> {
let exe_path = std::env::current_exe()?;
let file = File::open(exe_path)?;
let mut reader = BufReader::new(file);

let mut hasher = crc32fast::Hasher::new();
let mut buf = [0u8; 128 * 1024];

loop {
let n = reader.read(&mut buf)?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}

let crc = hasher.finalize();

Ok(format!("{:08x}", crc))
}
Loading
Loading