Workflow message passing - Java SDK
A Workflow can act like a stateful web service that receives messages: Queries, Signals, and Updates. The Workflow implementation defines these endpoints via handler methods that can react to incoming messages and return values. Temporal Clients use messages to read Workflow state and control execution. See Workflow message passing for a general overview of this topic. This page introduces these features for the Temporal Java SDK.
Write message handlersβ
Follow these guidelines when writing your message handlers:
- Message handlers are defined as methods on the Workflow class, using one of the three annotations:
@QueryMethod
,@SignalMethod
, and@UpdateMethod
. - The parameters and return values of handlers and the main Workflow function must be serializable.
- Prefer a single class with multiple fields over using multiple input parameters. A class allows you to add fields without changing the calling signature.
Query handlersβ
A Query is a synchronous operation that retrieves state from a Workflow Execution:
public class MessagePassingIntro {
public enum Language {
CHINESE,
ENGLISH,
FRENCH,
SPANISH,
PORTUGUESE,
}
public static class GetLanguagesInput {
public boolean includeUnsupported;
public GetLanguagesInput() {
this.includeUnsupported = false;
}
public GetLanguagesInput(boolean includeUnsupported) {
this.includeUnsupported = includeUnsupported;
}
}
@WorkflowInterface
public interface GreetingWorkflow {
...
// π Use the @QueryMethod annotation to define a Query handler in the
// Workflow interface.
@QueryMethod
List<Language> getLanguages(GetLanguagesInput input);
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public List<Language> getLanguages(GetLanguagesInput input) {
// π The Query handler returns a value: it must not mutate the Workflow state
// or perform blocking operations.
if (input.includeUnsupported) {
return Arrays.asList(Language.values());
} else {
return new ArrayList(greetings.keySet());
}
}
}
}
- A Query handler must not modify Workflow state.
- You can't perform blocking operations such as executing an Activity in a Query handler.
- The Query annotation accepts an argument (
name
) as described in the API reference docs for@QueryMethod
.
Signal handlersβ
A Signal is an asynchronous message sent to a running Workflow Execution to change its state and control its flow:
public class MessagePassingIntro {
public static class ApproveInput {
private String name;
public ApproveInput() {}
public ApproveInput(String name) {
this.name = name;
}
}
@WorkflowInterface
public interface GreetingWorkflow {
...
// π Use the @SignalMethod annotation to define a Signal handler in the
// Workflow interface.
@SignalMethod
void approve(ApproveInput input);
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public Language setLanguage(Language language) {
// π The Signal handler mutates the Workflow state but cannot return a value.
Language previousLanguage = this.language;
this.language = language;
return previousLanguage;
}
}
}
-
The handler should not return a value. The response is sent immediately from the server, without waiting for the Workflow to process the Signal.
-
The Signal annotation accepts arguments (
name
, andunfinished_policy
) as described in the API reference docs for@SignalMethod
. -
Signal (and Update) handlers can be blocking. This allows you to use Activities, Child Workflows, durable
Workflow.sleep
Timers,Workflow.await
, and more. See Blocking handlers and Workflow message passing for guidelines on safely using blocking Signal and Update handlers.
Update handlers and validatorsβ
An Update is a trackable synchronous request sent to a running Workflow Execution. It can change the Workflow state, control its flow, and return a result. The sender must wait until the Worker accepts or rejects the Update. The sender may wait further to receive a returned value or an exception if something goes wrong:
public class MessagePassingIntro {
@WorkflowInterface
public interface GreetingWorkflow {
...
// π Use the @UpdateMethod annotation to define an Update handler in the
// Workflow interface.
@UpdateMethod
Language setLanguage(Language language);
// π Update validators are optional
@UpdateValidatorMethod(updateName = "setLanguage")
void setLanguageValidator(Language language);
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public Language setLanguage(Language language) {
// π The Update handler can mutate the Workflow state and return a value.
Language previousLanguage = this.language;
this.language = language;
return previousLanguage;
}
@Override
public void setLanguageValidator(Language language) {
// π The Update validator performs validation but cannot mutate the Workflow state.
if (!greetings.containsKey(language)) {
throw new IllegalArgumentException("Unsupported language: " + language);
}
}
}
}
-
The Update annotation accepts arguments (
name
, andunfinished_policy
) as described in the API reference docs for@UpdateMethod
. -
About validators:
- Use validators to reject an Update before it is written to History. Validators are always optional. If you don't need to reject Updates, you can skip them.
- Define an Update validator with the
@UpdateValidatorMethod
annotation. Use theupdateName
argument when declaring the validator to connect it to its Update. The validator must returnvoid
and accept the same argument types as the handler.
-
Accepting and rejecting Updates with validators:
- To reject an Update, throw an exception of any type in the validator.
- Without a validator, Updates are always accepted.
-
Validators and Event History:
- The
WorkflowExecutionUpdateAccepted
event is written into the History whether the acceptance was automatic or programmatic. - When a Validator throws an error, the Update is rejected, the Update is not run, and
WorkflowExecutionUpdateAccepted
won't be added to the Event History. The caller receives an "Update failed" error.
- The
-
Use
getCurrentUpdateInfo
to obtain information about the current Update. This includes the Update ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once. -
Signal (and Update) handlers can be blocking, letting them use Activities, Child Workflows, durable
Workflow.sleep
Timers,Workflow.await
conditions, and more. See Blocking handlers and Workflow message passing for safe usage guidelines.
Send messagesβ
To send Queries, Signals, or Updates you call methods on a WorkflowInterface
, often called the "WorkflowStub."
Use newWorkflowStub to obtain the WorkflowStub.
For example:
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build();
// Create the workflow client stub. It is used to start the workflow execution.
GreetingWorkflow workflow = client.newWorkflowStub(GreetingWorkflow.class, workflowOptions);
// Start workflow asynchronously and call its getGreeting workflow method
WorkflowClient.start(workflow::getGreetings);
To check the argument types required when sending messages -- and the return type for Queries and Updates -- refer to the corresponding handler method in the Workflow Definition.
Send a Queryβ
Call a Query method defined within a Workflow from a WorkflowStub
created in Client code to send a Query to a Workflow Execution:
List<Language> languages = workflow.getLanguages(new GetLanguagesInput(false));
System.out.println("Supported languages: " + languages);
-
Sending a Query doesnβt add events to a Workflow's Event History.
-
You can send Queries to closed Workflow Executions within a Namespace's Workflow retention period. This includes Workflows that have completed, failed, or timed out. Querying terminated Workflows is not safe and, therefore, not supported.
-
A Worker must be online and polling the Task Queue to process a Query.
Send a Signalβ
You can send a Signal to a Workflow Execution from a Temporal Client or from another Workflow Execution. However, you can only send Signals to Workflow Executions that havenβt closed.
Send a Signal from a Clientβ
To send a Signal from Client code, call a Signal method on the WorkflowStub:
workflow.approve(new ApproveInput("Me"));
-
The call returns when the server accepts the Signal; it does not wait for the Signal to be delivered to the Workflow Execution.
-
The WorkflowExecutionSignaled Event appears in the Workflow's Event History.
Send a Signal from a Workflowβ
A Workflow can send a Signal to another Workflow, known as an External Signal.
Use Workflow.newExternalWorkflowStub
in your current Workflow to create an ExternalWorkflowStub
for the other Workflow.
Call Signal methods on the external stub to Signal the other Workflow:
OtherWorkflow other = Workflow.newExternalWorkflowStub(OtherWorkflow.class, otherWorkflowID);
other.mySignalMethod();
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
Signal-With-Startβ
Signal-With-Start allows a Client to send a Signal to a Workflow Execution, starting the Execution if it is not already running.
If there's a Workflow running with the given Workflow Id, it will be signaled.
If there isn't, a new Workflow will be started and immediately signaled.
To use Signal-With-Start, call signalWithStart
and pass the name of your Signal with its arguments:
public static void signalWithStart() {
// WorkflowStub is a client-side stub to a single Workflow instance
WorkflowStub untypedWorkflowStub = client.newUntypedWorkflowStub("GreetingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
untypedWorkflowStub.signalWithStart("setCustomer", new Object[] {customer2}, new Object[] {customer1});
String greeting = untypedWorkflowStub.getResult(String.class);
}
Here's the WorkflowInterface
for the previous example.
When using Signal-With-Start, the Signal handler (setCustomer
) will be executed before the Workflow method (greet
).
@WorkflowInterface
public interface GreetingWorkflow {
@WorkflowMethod
String greet(Customer customer);
@SignalMethod
void setCustomer(Customer customer);
@QueryMethod
Customer getCustomer();
}
Send an Updateβ
An Update is a synchronous, blocking call that can change Workflow state, control its flow, and return a result.
A Client sending an Update must wait until the Server delivers the Update to a Worker. Workers must be available and responsive. If you need a response as soon as the Server receives the request, use a Signal instead. Also note that you can't send Updates to other Workflow Executions or perform an Update equivalent of Signal-With-Start.
WorkflowExecutionUpdateAccepted
is added to the Event History when the Worker confirms that the Update passed validation.WorkflowExecutionUpdateCompleted
is added to the Event History when the Worker confirms that the Update has finished.
To send an Update to a Workflow Execution, you can:
-
Call the Update method on a WorkflowStub in Client code and wait for the Update to complete. This code fetches an Update result:
Language previousLanguage = workflow.setLanguage(Language.CHINESE);
-
Send
startUpdate
to receive anWorkflowUpdateHandle
as soon as the Update is accepted or rejected.- Use this
WorkflowUpdateHandle
later to fetch your results. - Blocking Update handlers normally perform long-running asynchronous operations.
startUpdate
only waits until the Worker has accepted or rejected the Update, not until all asynchronous operations are complete.
For example:
WorkflowUpdateHandle<Language> handle =
WorkflowStub.fromTyped(workflow)
.startUpdate(
"setLanguage", WorkflowUpdateStage.ACCEPTED, Language.class, Language.ENGLISH);
previousLanguage = handle.getResultAsync().get();For more details, see the "Blocking handlers" section.
- Use this
To obtain an Update handle, you can:
- Use
startUpdate
to start an Update and return the handle, as shown in the preceding example. - Use
getUpdateHandle
to fetch a handle for an in-progress Update using the Update ID and Workflow ID.
You can use the WorkflowUpdateHandle
to obtain information about the update:
getExecution()
: Returns the Workflow Execution that this Update was sent to.getId()
: Returns the Update's unique ID, which can be useful for deduplication when using Continue-As-New: see Ensuring your messages are processed exactly once.getResultAsync()
: Returns aCompletableFuture
which can be used to wait for the Update to complete.
In real-world development, sometimes you may be unable to import Workflow Definition method signatures. When you don't have access to the Workflow Definition or it isn't written in Java, you can use these non-type safe APIs to obtain an untyped WorkflowStub:
Pass method names instead of method objects to:
Message handler patternsβ
This section covers common write operations, such as Signal and Update handlers. It doesn't apply to pure read operations, like Queries or Update Validators.
For additional information, see Inject work into the main Workflow, and Ensuring your messages are processed exactly once.
Do blocking operations in handlersβ
Signal and Update handlers can block.
This allows you to use Workflow.await
, Activities, Child Workflows, Workflow.sleep
Timers, etc.
This expands the possibilities for what can be done by a handler but it also means that handler executions and your main Workflow method are all running concurrently, with switching occurring between them at await calls.
It's essential to understand the things that could go wrong in order to use blocking handlers safely. See Workflow message passing for guidance on safe usage of blocking Signal and Update handlers, and the Controlling handler concurrency and Waiting for message handlers to finish sections below.
The following code modifies the Update handler from earlier on in this page. The Update handler now makes a blocking call to execute an Activity:
public static class GreetingWorkflowImpl implements GreetingWorkflow {
...
@Override
public Language setLanguage(Language language) {
if (!greetings.containsKey(language)) {
String greeting = activity.greetingService(language);
if (greeting == null) {
// π An update validator cannot be blocking, so cannot be used to check that the remote
// greetingService supports the requested language. Throwing an ApplicationFailure
// will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
// added to history.
throw ApplicationFailure.newFailure("Greeting service does not support: " + language, "GreetingFailure")
}
greetings.put(language, greeting);
}
Language previousLanguage = this.language;
this.language = language;
return previousLanguage;
}
}
Although a Signal handler can also make blocking calls like this, using an Update handler allows the Client to receive a result or error once the Activity completes. This lets your Client track the progress of asynchronous work performed by the Update's Activities, Child Workflows, etc.
Add blocking wait conditionsβ
Sometimes, blocking Signal or Update handlers need to meet certain conditions before they should continue.
You can use Workflow.await
to prevent the code from proceeding until a condition is true.
You specify the condition by passing a function that returns true
or false
.
This is an important feature that helps you control your handler logic.
Here are two important use cases for Workflow.await
:
- Waiting in a handler until it is appropriate to continue.
- Waiting in the main Workflow until all active handlers have finished.
Wait for conditions in handlersβ
It's common to use Workflow.await
in a handler.
For example, suppose your Workflow class has a updateReadyToExecute
method that indicates whether your Update handler should be allowed to start executing.
You can use workflow.wait_condition
in the handler to make the handler pause until the condition is met:
@Override
public String setLanguage(UpdateInput input) {
Workflow.await(() -> this.updateReadyToExecute(input));
...
}
Remember: handlers can execute before the main Workflow method starts.
You can also use Workflow.await
anywhere else in the handler to wait for a specific condition to become true.
This allows you to write handlers that pause at multiple points, each time waiting for a required condition to become true.
Ensure your handlers finish before the Workflow completesβ
Workflow.await
can ensure your handler completes before a Workflow finishes.
When your Workflow uses blocking Signal or Update handlers, your main Workflow method can return or Continue-as-New while a handler is still waiting on an async task, such as an Activity.
The Workflow completing may interrupt the handler before it finishes crucial work and cause Client errors when trying to retrieve Update results.
Use Workflow.await
to wait for Workflow.isEveryHandlerFinished
to return true
to address this problem and allow your Workflow to end smoothly:
public class MyWorkflowImpl implements MyWorkflow {
...
@Override
public String run() {
...
Workflow.await(() -> Workflow.isEveryHandlerFinished());
return "workflow-result";
}
}
By default, your Worker will log a warning when you allow a Workflow Execution to finish with unfinished handler executions.
You can silence these warnings on a per-handler basis by passing the unfinishedPolicy
argument to the @SignalMethod
/ @UpdateMethod
annotation:
@WorkflowInterface
public interface MyWorkflow {
...
@UpdateMethod(unfinishedPolicy = HandlerUnfinishedPolicy.ABANDON)
void myUpdate();
}
See Finishing handlers before the Workflow completes for more information.
Use @WorkflowInit
to operate on Workflow input before any handler executesβ
Normally, your Workflows constructor won't have any parameters.
However, if you use the @WorkflowInit
annotation on your constructor, you can give it the same Workflow parameters as your @WorkflowMethod
.
The SDK will then ensure that your constructor receives the Workflow input arguments that the Client sent.
The Workflow input arguments are also passed to your @WorkflowMethod
method -- that always happens, whether or not you use the @WorkflowInit
annotation.
This is useful if you have message handlers that need access to Workflow input: see Initializing the Workflow first.
Here's an example.
Notice that the constructor and getGreeting
must have the same parameters:
public class GreetingExample {
@WorkflowInterface
public interface GreetingWorkflow {
@WorkflowMethod
String getGreeting(String input);
@UpdateMethod
boolean checkTitleValidity();
}
public static class GreetingWorkflowImpl implements GreetingWorkflow {
private final String nameWithTitle;
private final String titleHasBeenChecked;
...
// Note the annotation is on a public constructor
@WorkflowInit
public GreetingWorkflowImpl(String input) {
this.nameWithTitle = "Sir " + input;
this.titleHasBeenChecked = false;
}
@Override
public String getGreeting(String input) {
Workflow.await(() -> titleHasBeenChecked)
return "Hello " + nameWithTitle;
}
@Override
public boolean checkTitleValidity() {
// π The handler is now guaranteed to see the workflow input
// after it has been processed by the constructor.
boolean isValid = activity.checkTitleValidity(nameWithTitle);
titleHasBeenChecked = true;
return isValid;
}
}
}
Use locks to prevent concurrent handler executionβ
Concurrent processes can interact in unpredictable ways. Incorrectly written concurrent message-passing code may not work correctly when multiple handler instances run simultaneously. Here's an example of a pathological case:
public class DataWorkflowImpl implements DataWorkflow {
...
@Override
public void badSignalHandler() {
Data data = activity.fetchData();
this.x = data.x;
// ππ Bug!! If multiple instances of this method are executing concurrently, then
// there may be times when the Workflow has self.x from one Activity execution and self.y from another.
Workflow.sleep(Duration.ofSeconds(1));
this.y = data.y;
}
}
Coordinating access with WorkflowLock
corrects this code.
Locking makes sure that only one handler instance can execute a specific section of code at any given time:
public class DataWorkflowImpl implements DataWorkflow {
WorkflowLock lock = Workflow.newWorkflowLock();
...
@Override
public void safeSignalHandler() {
try {
lock.lock();
Data data = activity.fetchData();
this.x = data.x;
// β
OK: the scheduler may switch now to a different handler execution,
// or to the main workflow method, but no other execution of this handler
// can run until this execution finishes.
Workflow.sleep(Duration.ofSeconds(1));
this.y = data.y;
} finally {
lock.unlock()
}
}
}
Message handler troubleshootingβ
When sending a Signal, Update, or Query to a Workflow, your Client might encounter the following errors:
-
The Client can't contact the server: You'll receive a
WorkflowServiceException
on which thecause
is aStatusRuntimeException
andstatus
ofUNAVAILABLE
(after some retries). -
The Workflow does not exist: You'll receive a
WorkflowNotFoundException
.
See Exceptions in message handlers for a nonβJava-specific discussion of this topic.
Problems when sending a Signalβ
When using Signal, the above WorkflowException
s are the only types of exception that will result from the request.
In contrast, for Queries and Updates, the client waits for a response from the Worker. If an issue occurs during the handler execution by the Worker, the Client may receive an exception.