Skip to main content

Cancellation - Rust SDK

Workflows and Activities in the Temporal Rust SDK support cancellation. This page shows how to handle cancellation in your Workflows and Activities.

Detect Workflow Cancellation

A Workflow can detect that it has been cancelled using the ctx.cancelled() method. This returns a future that resolves with the cancellation reason when the Workflow is cancelled:

use temporalio_sdk::workflows::select;

#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
select! {
result = ctx.activity(
MyActivities::long_running_operation,
input,
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(300)),
..Default::default()
}
) => {
Ok(format!("Activity completed: {:?}", result?))
}
reason = ctx.cancelled() => {
Ok(format!("Workflow cancelled: {}", reason))
}
}
}

When the Workflow receives a cancellation request, ctx.cancelled() will resolve with the cancellation reason.

Graceful Cancellation Handling

When a Workflow is cancelled, you should stop executing new activities and clean up gracefully:

use temporalio_sdk::workflows::select;

#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<Vec<String>> {
let mut results = Vec::new();

for item in &self.items {
select! {
result = ctx.activity(
MyActivities::process_item,
item.clone(),
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(60)),
..Default::default()
}
) => {
results.push(result??);
}
reason = ctx.cancelled() => {
// Workflow was cancelled
workflow::logger.info(format!("Processing cancelled: {}", reason));
return Err(WorkflowExecutionError::new_with_reason(
"cancelled",
&reason
));
}
}
}

Ok(results)
}

Handle Cancellation in Activities

Activities can allow graceful cancellation by regularly recording heartbeats:

use temporalio_sdk::activities::{ActivityContext, ActivityError};
use std::time::Duration;

#[activity]
pub async fn process_many_items(
ctx: ActivityContext,
items: Vec<String>,
) -> Result<usize, ActivityError> {
let mut processed = 0;

for item in items {
// Process the item
do_work(&item).await?;
processed += 1;

// Record heartbeat to detect cancellation
ctx.record_heartbeat(processed);

// Small delay to simulate work
tokio::time::sleep(Duration::from_millis(100)).await;
}

Ok(processed)
}

When an Activity is cancelled:

  • The Activity will be notified through its heartbeat mechanism
  • The Activity should then clean up any resources and exit
  • The Workflow will receive an error with ActivityError::Cancelled

Workflow Cancellation Semantics

When a Workflow is cancelled:

  1. The cancellation request is delivered to the Workflow task
  2. ctx.cancelled() resolves with the cancellation reason
  3. The Workflow should stop starting new activities and clean up
  4. The Workflow Execution terminates with a cancelled status

Immediate termination

You can also use Workflow termination to immediately stop the Workflow without allowing it to handle cancellation:

// This terminates the workflow process immediately
// from outside the workflow (e.g., from a client)
let handle = client.get_workflow_handle::<MyWorkflow>("workflow-id");
handle.terminate(TerminateWorkflowOptions::builder()
.reason("Emergency stop")
.build()).await?;

Cancellation Best Practices

  1. Always handle cancellation: Wrap long-running operations in select! to detect cancellation
  2. Clean up resources: When cancelled, make sure to properly close connections, stop child workflows, etc.
  3. Record heartbeats in activities: For long Activities, record heartbeats regularly so cancellation can be detected
  4. Don't ignore cancellation: Returning Ok after cancellation will mislead downstream systems
  5. Be idempotent: Ensure cancelled operations can be safely retried if the Workflow retries

Cancellation Flow Example

Here's a complete example of handling cancellation in a Workflow with multiple activities:

#[workflow]
pub struct ProcessingWorkflow {
items: Vec<String>,
}

#[workflow_methods]
impl ProcessingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView, items: Vec<String>) -> Self {
Self { items }
}

#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<ProcessingResult> {
let items = ctx.state(|s| s.items.clone());
let mut successful = 0;
let mut failed = 0;

for item in items {
select! {
result = ctx.activity(
MyActivities::process_item,
item,
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(60)),
..Default::default()
}
) => {
match result? {
Ok(_) => successful += 1,
Err(_) => failed += 1,
}
}
reason = ctx.cancelled() => {
return Err(WorkflowExecutionError::new(
"cancelled",
format!("Processing cancelled after {} items. Reason: {}", successful + failed, reason)
));
}
}
}

Ok(ProcessingResult {
successful,
failed,
total: successful + failed,
})
}
}

This Workflow processes items but will stop and report progress if cancelled.