PeteScript logo PeteScript

How to build AWS State Machines using AWS CDK - Part III

8 minutes read

PeteScript - How to build AWS State Machines using AWS CDK - Part III

This is the third-part in a blog mini-series showcasing the functionality of AWS Step Function state machines and how to express them as infrastructure as code using AWS CDK. If you haven’t read the first two parts, check them out here and here — as we continue using the same example for continuity between each.

Last time, we focused on the choice flow operation — this allows us to perform conditional evaluations based on different state parameters that are available (and potentially mutated) throughout your state machine.

During this blog, we’re going to focus on some of the other flow operations that are available to you when defining your state machine. These operations can optimise and improve the processing of your data.

❓ What

As I mentioned above, we’re going to focus on the remaining flow operations that are available to you when crafting your state machine. A reminder of all of the flow operators available at your disposal:

  • Choice (if/then-else logic)
  • Parallel
  • Map
  • Wait

Continuing on with building up the same state machine, we have already interfaced with the choice and wait operations, so within this one, we’re going to hone in on the parallel and map operations — how they can be defined and used.

So let’s dive in and see some potential use cases for these operations!

✍️ Define some CDK

Picking up from where we left off last time, our final state machine CDK definition looked something like the following:

import * as cdk from "aws-cdk-lib";  
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";  
import * as lambda from "aws-cdk-lib/aws-lambda";  
  
import * as stepFunctions from "aws-cdk-lib/aws-stepfunctions";  
import * as stepFunctionsTasks from "aws-cdk-lib/aws-stepfunctions-tasks";  
  
const mockLambdaFunctionArn =  
  "arn:aws:lambda:us-east-1:12345:function:my-shiny-lambda-function";  
const mockDdbTableArn =  
  "arn:aws:dynamodb:us-east-1:12345:table/my-shiny-dynamodb-table";  
  
const lambdaFunction = lambda.Function.fromFunctionArn(  
  this,  
  "lambda-function",  
  mockLambdaFunctionArn  
);  
const dynamodbTable = dynamodb.Table.fromTableArn(  
  this,  
  "dynamo-db-table",  
  mockDdbTableArn  
);  
  
const processJob = new stepFunctionsTasks.LambdaInvoke(  
  this,  
  "state-machine-process-job-fn", {  
    lambdaFunction: lambdaFunction,  
    resultPath: "$.processJobResult",  
  }  
);  
  
const wait10MinsTask = new stepFunctions.Wait(  
  this,  
  "state-machine-wait-job", {  
    time: stepFunctions.WaitTime.duration(cdk.Duration.minutes(10)),  
  }  
);  
  
const ddbWrite = new stepFunctionsTasks.DynamoPutItem(  
  this,  
  "ddb-write-job", {  
    item: {  
      uuid: stepFunctionsTasks.DynamoAttributeValue.fromString(  
        crypto.randomUUID()  
      ),  
      timestamp: stepFunctionsTasks.DynamoAttributeValue.fromString(  
        new Date().toISOString()  
      ),  
    },  
    table: dynamodbTable,  
    resultPath: "$.ddbWriteResult",  
  }  
);  
  
// Constructs for if/else choice block  
const conditionalMatchLambdaFunction = new stepFunctionsTasks.LambdaInvoke(  
  this,  
  "state-machine-conditional-match-fn",  
  {  
    lambdaFunction: lambdaFunction,  
  }  
);  
const choiceStatement = new stepFunctions.Choice(  
  this,  
  "conditional-choice-block"  
);  
const choiceCondition = stepFunctions.Condition.booleanEquals(  
  "$.pass",  
  true  
);  
const elsePassStep = new stepFunctions.Pass(this, "else-block-pass");  
  
const stateMachineDefinition = processJob  
  .next(wait10MinsTask)  
  .next(ddbWrite)  
  .next(  
    choiceStatement  
      .when(choiceCondition, conditionalMatchLambdaFunction)  
      .otherwise(elsePassStep)  
  );  
  
const stateMachine = new stepFunctions.StateMachine(this, "state-machine", {  
  definitionBody: stepFunctions.DefinitionBody.fromChainable(  
    stateMachineDefinition  
  ),  
  timeout: cdk.Duration.minutes(15),  
  stateMachineName: "ProcessAndReportJob",  
});

⚡️ Parallel

The parallel flow operation can be used to add separate branches to your definition in order to facilitate parallelisation in your state machine. Critically though, it will wait until all branches defined in your parallel state reach a terminal state before proceeding onto the next block.

The branches in the parallel state don’t need to perform the same operation, nor do they need to produce the same structured output.

Let’s take a look at how we can define this within CDK:

import * as cdk from "aws-cdk-lib";  
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";  
import * as lambda from "aws-cdk-lib/aws-lambda";  
import * as stepFunctions from "aws-cdk-lib/aws-stepfunctions";  
  
// Constructs for parallel block  
const addressLookupLambda = new stepFunctionsTasks.LambdaInvoke(  
  this,  
  "address-lookup-fn", {  
    lambdaFunction: lambdaFunction,  
    resultPath: "$.addressLookupLambdaResult",  
  }  
);  
  
const phoneLookupLambda = new stepFunctionsTasks.LambdaInvoke(  
  this,  
  "phone-lookup-fn", {  
    lambdaFunction: lambdaFunction,  
    resultPath: "$.phoneLookupLambdaResult",  
  }  
);  
  
const parallelBlock = new stepFunctions.Parallel(  
  this,  
  "PII Parallel Process"  
)  
 .branch(addressLookupLambda)  
 .branch(phoneLookupLambda);

So we’re using a fairly trivial example here to showcase the syntax and setup, but you can see that it’s straightforward to define a parallel process using the construct available.

Here we are just defining two lambda function tasks, which we have previously defined elsewhere in the CDK, that are going to perform two separate tasks/lookups to fetch some data.

We then specify our Parallel block which contains two chainable methods using .branch() and these are what determine which tasks to perform in parallel — so in our case, it’s both of our lambda invocations. Like I said above, these don’t have to be the same task performed, reading from the same resources etc. Any of the plethora of state machine tasks can be invoked here, just critical to remember that the parallel block will wait until all tasks have completed before proceeding any further in the state machine.

Note: whilst you can have more than 2 branches in parallel and there doesn’t appear to be an explicit upper limit within the AWS documentation, I’d recommend not invoking more than 5 processes in parallel. If you have a larger orchestration platform with multiple large async running processes, it might be worth considering separate patterns.

Finally, we just append the parallelBlock onto our state machine definition like so:

const stateMachineDefinition = processJob  
  .next(wait10MinsTask)  
  .next(ddbWrite)  
  .next(parallelBlock)  
  .next(  
    choiceStatement  
      .when(choiceCondition, conditionalMatchLambdaFunction)  
      .otherwise(elsePassStep)  
  );

And with some valid execution input, the state machine will execute as expected and run the two lookup lambdas in parallel — resulting in our graph looking like the following:

Successful state machine execution with parallel block

Successful state machine execution with parallel block

When it comes to the output of the parallel block, since it waits for all tasks to complete, the output from a parallel block is an array of objects. Depending on what resultPath or outputPath you configure for each of the task blocks within the parallel operation, it will return an array containing the output from every task.

So in our case above, our output from the parallel block looks similar to the following structure:

{  
  "pass": true,  
  "processJobResult": {  },  
  "ddbWriteResult": {  },  
  "parallelBlockOutput": [  
    {  
      "addressLookupLambdaResult": {  }  
    },  
    {  
      "phoneLookupLambdaResult": {  }  
    }  
  ]  
}

This can then be parsed and passed onto the next state transition in the state machine for conditional logic, further processing or storage elsewhere.

🗺 Map

The final major flow operation that you can utilise is map. This operation allows you to process items in a data set concurrently within your state machine.

There are two options for processing the items within the map:

  • Inline
  • Distributed

The main difference between these two is how much data can be processed. Inline is useful for smaller data sets and you don’t require large amounts of concurrency as inline only supports a maximum of 40 concurrent iterations.

In contrast, distributed is the high-concurrency mode and can process up to 10,000 concurrent iterations — however, it does come with some other separate limitations to consider before using.

However, for most use cases within a single state machine execution, both of these options should be more than sufficient unless you are dealing with extremely high-volume and high throughput ETL processes.

So let’s have a look at how we might define this in CDK:

const mapBlock = new stepFunctions.Map(this, "Transform Map Process", {  
  maxConcurrency: 5,  
  itemsPath: "$.itemsToTransform",  
  resultPath: "$.mapBlockOutput",  
});

We first start by defining the Map construct itself. This defines the core properties that the iterator will execute the task against. In the above, we’re explicitly setting the maxConcurrency, specifying the itemsPath (where the array is defined in your input via JSONpath for map to iterate over) and finally the resultPath which defines where the output of the map tasks are to be output to.

We then need to define the task/process we want to perform within the map iterator itself:

const transformIteratorFn = new stepFunctionsTasks.LambdaInvoke(  
  this,  
  "transform-iterator-fn", {  
    lambdaFunction: lambdaFunction,  
  }  
);

Again, for showcase purposes, I’ve just reused the same Lambda function task definition that we have used for most of our tasks — however, this can be swapped out with any chainable task methods that can be defined in the state machine. These can be useful for almost separate sub-routine definitions that you want to perform inline inside your overall state machine.

So now, for each item in our array, we are going to invoke this lambda function before finally outputting the result to our defined resultPath.

Note: in a similar fashion to the parallel operation, the state machine will not proceed onto the next step in your state machine until all of the iterations within the map have completed.

Finally, we just append this map construct onto our overall state machine definition:

const stateMachineDefinition = processJob  
  .next(wait10MinsTask)  
  .next(ddbWrite)  
  .next(parallelBlock)  
  .next(mapBlock.iterator(transformIteratorFn))  
  .next(  
    choiceStatement  
    .when(choiceCondition, conditionalMatchLambdaFunction)  
    .otherwise(elsePassStep)  
  );

Once deployed, we can define some input criteria for an execution of our state machine:

{  
  "pass": true,  
  "itemsToTransform": ["Joe", "Rachael", "Steve", "Jenny"]  
}

And the lovely output of our execution should look something like this!

State machine execution that contains a map flow operation

State machine execution that contains a map flow operation

There we have it — a state machine that contains a map operation that can be used for concurrent processing of data, nice.

Conclusion

Remember to cdk destroy once you’re done to tear down the Cloudformation stack!

  • Flow operations within state machines are incredibly useful to create advanced ETL processes, event-driven orchestration and just general input/output management between different processes.
  • Parallel operations allow you to perform different tasks concurrently before returning the result of the branches for further computation, thus speeding up the time your state machine takes to complete.
  • Map operations allow you to iterate over a list of items and perform the same task on them concurrently, speeding up the time of completion.
  • I’ve only really scratched the surface with some trivial examples here of what is possible and display the CDK interfaces for defining these constructs. I would encourage you to explore the documentation more to find out the other features of these operations that could be of use to you.
  • You can view the additional API properties for the parallel operation here and for the map operation, you can read more about the concepts of how it works and some of the limitations that I mentioned above (inline vs. distributed) that can assist you with picking the best option for your use case.