Activity execution - Rust SDK
Execute an Activity
How to execute an Activity from a Workflow using the Temporal Rust SDK.
The primary way that a Workflow uses an Activity is by executing it. To execute an Activity in a Workflow, use the ctx.activity() method on the Workflow Context. This method automatically handles determinism requirements and returns the Activity result.
use std::time::Duration;
use temporalio_sdk::{
activities::ActivityOptions,
workflow::{WorkflowContext, WorkflowContextView},
WorkflowResult,
};
use temporalio_macros::{workflow, workflow_methods};
#[workflow]
pub struct GreetingWorkflow {
name: String,
}
#[workflow_methods]
impl GreetingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView, name: String) -> Self {
Self { name }
}
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<String> {
let name = ctx.state(|s| s.name.clone());
// Execute activity with default options
let greeting = ctx.activity(
GreetingActivities::greet,
name,
ActivityOptions::default(),
).await?;
Ok(greeting)
}
}
Activity Execution Options
You can customize Activity execution behavior using ActivityOptions:
use std::time::Duration;
use temporalio_sdk::activities::ActivityOptions;
let result = ctx.activity(
MyActivities::my_activity,
input,
ActivityOptions {
// How long the Activity can execute before it times out
start_to_close_timeout: Some(Duration::from_secs(10)),
// How long to wait before retrying a failed Activity
schedule_to_close_timeout: Some(Duration::from_secs(60)),
// How long between heartbeats before marking Activity as failed
heartbeat_timeout: Some(Duration::from_secs(5)),
..Default::default()
},
).await?;
Common Activity Options
start_to_close_timeout- Maximum time an Activity can execute from when it starts until it completes. This is the most common timeout to set.schedule_to_close_timeout- Maximum time from when the Activity is scheduled until it completes.heartbeat_timeout- How long to wait for a heartbeat signal whenrecord_heartbeat()is being used.retry_policy- Configure retry behavior with backoff and maximum attempts.heartbeat_details- Details to include in heartbeat signals.
Activity Retry Policy
Control how Activities are retried on failure using RetryPolicy:
use temporalio_sdk::activities::{ActivityOptions, RetryPolicy};
use std::time::Duration;
let result = ctx.activity(
MyActivities::process_data,
input,
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(10)),
retry_policy: Some(RetryPolicy {
initial_interval: Duration::from_secs(1),
backoff_coefficient: 2.0,
maximum_interval: Duration::from_secs(100),
maximum_attempts: 5,
non_retryable_errors: vec!["ValidationError".to_string()],
..Default::default()
}),
..Default::default()
},
).await?;
When an Activity returns ActivityError::NonRetryable, it will not be retried regardless of the retry policy.
Local Activities
For short-lived, low-latency Activities that you want to run on the same Worker as the Workflow, use Local Activities with ctx.local_activity():
use std::time::Duration;
use temporalio_sdk::activities::LocalActivityOptions;
// Local activities execute on the same worker without going through task queues
let fast_result = ctx.local_activity(
MyActivities::quick_operation,
input,
LocalActivityOptions {
schedule_to_close_timeout: Some(Duration::from_secs(5)),
..Default::default()
},
).await?;
Local Activities:
- Execute immediately on the same Worker process
- Don't go through task queues
- Are ideal for very fast operations or accessing local state
- Still respect determinism constraints of Workflows
Get Activity Context Information
Inside an Activity, you can access information about the Activity Execution using the ActivityContext:
use temporalio_sdk::activities::{ActivityContext, ActivityError};
#[activity]
pub async fn my_activity(ctx: ActivityContext, input: String) -> Result<String, ActivityError> {
// Get activity execution information
let info = ctx.info();
println!("Activity Type: {}", info.activity_type);
println!("Current attempt: {}", info.attempt);
println!("Heartbeat details: {:?}", info.heartbeat_details);
// Your activity logic here
Ok(format!("Processed: {}", input))
}
Record Activity Heartbeats
For long-running Activities, record heartbeats to report progress and allow graceful cancellation:
use temporalio_sdk::activities::{ActivityContext, ActivityError};
use std::time::Duration;
#[activity]
pub async fn long_running_activity(
ctx: ActivityContext,
items: Vec<String>,
) -> Result<i32, ActivityError> {
let mut processed = 0;
for (i, item) in items.iter().enumerate() {
// Process each item
process_item(item).await?;
processed = i + 1;
// Record heartbeat with progress details
ctx.record_heartbeat(processed);
// Small delay to demonstrate heartbeating
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(processed as i32)
}
Heartbeating:
- Allows the Temporal Service to track Activity progress
- Enables graceful cancellation (Activity can check for cancellation)
- Resets the heartbeat timeout timer
- Can include details about progress that are recovered on retry
Handle Activity Cancellation
When an Activity is cancelled, you can detect this and clean up resources:
use temporalio_sdk::activities::{ActivityContext, ActivityError};
#[activity]
pub async fn cancellable_activity(
ctx: ActivityContext,
work_items: Vec<String>,
) -> Result<(), ActivityError> {
for item in work_items {
// Check if activity has been cancelled
if let Some(reason) = ctx.cancelled() {
return Err(ActivityError::Cancelled);
}
// Do work with item
println!("Processing: {}", item);
}
Ok(())
}
When an Activity detects cancellation, it should:
- Clean up any resources it's using
- Return
ActivityError::Cancelled - Stop immediately without doing more work
Parallel Activity Execution
You can execute multiple Activities in parallel within a Workflow:
use std::time::Duration;
use temporalio_sdk::{
activities::ActivityOptions,
workflow::{select, WorkflowContext, WorkflowContextView},
WorkflowResult,
};
use temporalio_macros::{workflow, workflow_methods};
#[workflow]
pub struct ParallelWorkflow;
#[workflow_methods]
impl ParallelWorkflow {
#[run]
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<(String, String)> {
let options = ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(10)),
..Default::default()
};
// Start multiple activities
let future1 = ctx.activity(
MyActivities::activity_one,
"input1".to_string(),
options.clone(),
);
let future2 = ctx.activity(
MyActivities::activity_two,
"input2".to_string(),
options,
);
// Wait for both to complete
let result1 = future1.await?;
let result2 = future2.await?;
Ok((result1, result2))
}
}
Multiple Activities can be executed in parallel, and the Workflow will wait for all of them to complete before moving forward. This is useful for independent work that can happen concurrently.