One of my recent favourite technologies is Temporal. It’s

durable execution platform—a modern toolkit for building resilient, long-running workflows in a code-centric, scalable way.

Basically, it helps engineers build a reliable workflow by providing ways to retry, resume, and replay the execution at the desired point in time.

Below is the highlevel overview of Temporal’s components. This post will focus on the Workflow code that is engineer’s responsibility to control.

  flowchart TB
    Client["Client SDK"]

    subgraph TemporalService["Temporal Service"]
        Frontend["Frontend API (gRPC)"]
        Matching["Task Queues (Matching Service)"]
        HistoryDB["History & State Storage"]
    end

    subgraph Worker["Worker (Your Code)"]
        Poller["Poller (Task Execution Loop)"]
        WF["Workflow Code"]
        ACT["Activity Code"]
    end

    Client -->|Start Workflow / Signal / Query| Frontend
    Frontend --> Matching
    Frontend --> HistoryDB
    Matching <--> Poller
    Poller --> WF
    WF --> ACT
    ACT --> Poller

Workflow Determinism

This is the most important part when you want to get a long with Temporal.

A critical aspect of developing Workflow Definitions is ensuring they exhibit certain deterministic traits – that is, making sure that the same Commands are emitted in the same sequence

In general, Temporal rely on execution history to recover state, so a workflow must always produce the same sequence of commands and results for the same inputs, ensuring consistent and predictable outcomes even after failures or restarts.

Below are things that I think most of the dev will need to know to maintain the workflow determinism across its lifetime is to ensure our application doesn’t break after code changing, or runtime pod restart.

Workflow definition and workflow execution

Workflow Definition is the code you write to describe the business logic as a sequence of steps. Workflow Execution is a specific run of that workflow definition. The deterministic constraints are applying to all running Workflow Execution. So if your Workflow Definition has no current running Workflow Execution (at the time you deploy the change to your deployment environments), you are completely safe to modify anything inside Workflow Definition.

Don’t change the activity

// old code
function Workflow(ctx workflow.Context, inout interface{}) (err error){
   ...
   workflow.ExecuteActivity(activity_A)
   ....
}

// new code
function Workflow(ctx workflow.Context, inout interface{}) (err error){
   ...
   workflow.ExecuteActivity(activity_B)
   ....
}

--> non determisitic error

Order of the keys

function Workflow(ctx workflow.Context, inout interface{}) (err error){
   kv := map[string]string{
		"keyB": "valueB",
		"keyA": "valueA",
		"keyC": "valueC",
	}

    for key, value := range kv {
		_ = workflow.ExecuteActivity(ctx, MyActivity, key, value).Get(ctx, nil)
	}
}

--> potential cause non-deterministic when replaying because the order of the keys in map might be changed.
--> Should sort the keys before iterating over to execute activity.

Be cautious with branching condition

function Workflow(ctx workflow.Context, inout interface{}) (err error){
   ...
   if time.Now() < '10/10/2024' {
	   workflow.ExecuteActivity(activity_A)
   }
   ....
}

--> non determisitic error if the workflow is replayed after 10/10/2024

Use versioning to patch workflow

// old code
function Workflow(ctx workflow.Context, inout interface{}) (err error){
   ...
   workflow.ExecuteActivity(activity_A)
   ....
}

// new patching code
function Workflow(ctx workflow.Context, inout interface{}) (err error){
   ...
   v := workflow.GetVersion(ctx,
		"deperated-actitivy-A",
		workflow.DefaultVersion, 1,
	)
	
	 if v == workflow.DefaultVersion{
	    workflow.ExecuteActivity(activity_A)
	 } else {
	   workflow.ExecuteActivity(activity_B)
	 }
   ....
}

--> when replaying, existing workflows have default version of 
"deperated-actitivy-A", so they will see the correct path.
New workflows will have version 1 then executes activity B

CI-CD

To ensure the determinism in real world scenario, the best practice is having CI-CD pipeline that replays the recent running workflows against new workflow definition. It will caught non-deterministic error.

import (
    "go.temporal.io/sdk/worker"
    "log/slog"
)

// replay the workflow histories of current running workflow instance
func DeterminismCheck(workflow any){
    histories, err := GetLatestRunningWorkflowsHistory(waitCtx, wfType, 100)
    if err != nil {
        panic(err)
    }
    replayer := worker.NewWorkflowReplayer()
    replayer.RegisterWorkflow(workflow)

    errs := make([]error, 0)
    for _, hist := range histories {
        if err = replayer.ReplayWorkflowHistory(slog.Default(), hist); err != nil {
            errs = append(errs, err)
        }
    }

    if len(errs) != 0 {
        panic(errs)
    }
}


// query: condition to find workflow executions
func GetLatestRunningWorkflowsHistory(ctx context.Context, query string, size int32) (histories []*history.History, err error) {
    if size == 0 {
		size = 10
	}

    tprClient := GetTemporalClient()
    resp, err := tprClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
        Query:    query,
        PageSize: size,
    })
    if err != nil {
        return
    }

    for _, exe := range resp.GetExecutions() {
        wfExe := exe.GetExecution()
        hist, err := GetWorkflowHistory(ctx, tprClient, wfExe.WorkflowId, wfExe.RunId)
        if err != nil {
            return nil, fmt.Errorf("get event history error: %w", err)
        }
        histories = append(histories, hist)
    }

    return
}

// Get execution history 
func GetWorkflowHistory(ctx context.Context, client client.Client, id, runID string) (*history.History, error) {
	if client == nil {
		client = GetTemporalClient()
	}
	var hist history.History
	iter := client.GetWorkflowHistory(ctx, id, runID, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
	for iter.HasNext() {
		event, err := iter.Next()
		if err != nil {
			err = errors.Wrap(err, "failed to get next event")
			return nil, err
		}
		hist.Events = append(hist.Events, event)
	}
	return &hist, nil
}