Serverless & Event-Driven Operations

Project: An Event-Driven Pipeline

18 min Lesson 10 of 28

Project: An Event-Driven Pipeline

This capstone lesson brings together every concept from the tutorial — Lambda, event sources, DLQs, observability, IaC, and Step Functions — into a single production-grade system. The target: an image-processing pipeline that accepts uploads via S3, transforms them through three async stages, persists metadata to DynamoDB, and delivers results to downstream consumers via SNS. Every failure is captured in a dead-letter queue and alarmed. Every execution is traceable end-to-end.

This is the design pattern behind real systems at scale: Netflix's asset-ingest pipeline, Shopify's media processing jobs, and AWS's own Rekognition orchestration all follow variants of this architecture. The same skeleton applies to video transcoding, document parsing, ML inference chains, and financial event processing.

Pipeline Architecture

The pipeline has five logical layers:

  1. Ingest — S3 PutObject event triggers the orchestrator Lambda via EventBridge Pipes, which starts a Step Functions Express Workflow execution per file.
  2. Validate — first state checks MIME type, file size (< 50 MB), and antivirus hash via a synchronous Lambda call; rejects land in a DLQ.
  3. Transform — a Map state fans out three parallel processing Lambdas (thumbnail, watermark, metadata extraction) with MaxConcurrency: 10.
  4. Persist — an SDK integration writes the enriched metadata record to DynamoDB in a single conditional-put; no Lambda needed for this step.
  5. Publish — SNS fan-out notifies downstream subscribers (CDN invalidation, search indexing, audit log).
Event-driven image processing pipeline architecture S3 Bucket PutObject EventBridge Pipes Step Functions Express Workflow per-file execution < 5 min SLA Validate Lambda DLQ SQS Rejects Map State MaxConcurrency: 10 Thumbnail Lambda Watermark Lambda DynamoDB SDK Integration SNS Topic Fan-out DLQ Transform Failures CloudWatch Logs + Metrics + Alarms + X-Ray
Full event-driven pipeline: S3 trigger flows through EventBridge Pipes into a Step Functions Express Workflow; validated files fan out to parallel Lambdas; results persist to DynamoDB and SNS; all failure paths route to DLQs; CloudWatch captures logs, metrics, and distributed traces across every layer.

Infrastructure as Code — Terraform Foundation

The entire pipeline is declared in Terraform. Critical design points: the Step Functions execution role is least-privilege per resource; Lambda functions share a single security group but use per-function IAM roles; the DLQs have message_retention_seconds = 1209600 (14 days, the maximum) so no failed event is silently dropped before you can investigate.

# pipeline.tf — core resources (state bucket and DynamoDB lock omitted for brevity) locals { prefix = "img-pipeline-${var.env}" } # ────────────────────────────────────────────── # Dead Letter Queues # ────────────────────────────────────────────── resource "aws_sqs_queue" "validation_dlq" { name = "${local.prefix}-validation-dlq" message_retention_seconds = 1209600 kms_master_key_id = aws_kms_key.pipeline.arn tags = local.common_tags } resource "aws_sqs_queue" "transform_dlq" { name = "${local.prefix}-transform-dlq" message_retention_seconds = 1209600 kms_master_key_id = aws_kms_key.pipeline.arn tags = local.common_tags } # CloudWatch alarm: any message in DLQ = PagerDuty page resource "aws_cloudwatch_metric_alarm" "dlq_not_empty" { for_each = toset(["validation", "transform"]) alarm_name = "${local.prefix}-${each.key}-dlq-not-empty" comparison_operator = "GreaterThanThreshold" evaluation_periods = 1 metric_name = "ApproximateNumberOfMessagesVisible" namespace = "AWS/SQS" period = 60 statistic = "Sum" threshold = 0 alarm_actions = [var.pagerduty_sns_arn] dimensions = { QueueName = "${local.prefix}-${each.key}-dlq" } } # ────────────────────────────────────────────── # Lambda functions (validate + three transform) # ────────────────────────────────────────────── resource "aws_lambda_function" "validate" { function_name = "${local.prefix}-validate" filename = data.archive_file.validate_zip.output_path handler = "index.handler" runtime = "nodejs22.x" role = aws_iam_role.validate_exec.arn timeout = 30 memory_size = 512 dead_letter_config { target_arn = aws_sqs_queue.validation_dlq.arn } tracing_config { mode = "Active" # X-Ray enabled on every invocation } environment { variables = { MAX_BYTES = "52428800" ALLOWED_TYPES = "image/jpeg,image/png,image/webp" } } reserved_concurrent_executions = 200 tags = local.common_tags } # ────────────────────────────────────────────── # Step Functions — Express Workflow # ────────────────────────────────────────────── resource "aws_sfn_state_machine" "pipeline" { name = "${local.prefix}-state-machine" role_arn = aws_iam_role.sfn_exec.arn type = "EXPRESS" definition = templatefile("${path.module}/pipeline.asl.json", { validate_arn = aws_lambda_function.validate.arn thumbnail_arn = aws_lambda_function.thumbnail.arn watermark_arn = aws_lambda_function.watermark.arn meta_arn = aws_lambda_function.metadata.arn dynamo_table = aws_dynamodb_table.assets.name sns_topic_arn = aws_sns_topic.asset_events.arn }) logging_configuration { level = "ALL" include_execution_data = true log_destination = "${aws_cloudwatch_log_group.sfn.arn}:*" } tracing_configuration { enabled = true } tags = local.common_tags } # ────────────────────────────────────────────── # EventBridge Pipe: S3 event → State Machine # ────────────────────────────────────────────── resource "aws_pipes_pipe" "s3_to_sfn" { name = "${local.prefix}-s3-to-sfn" role_arn = aws_iam_role.pipe_exec.arn source = aws_sqs_queue.s3_event_queue.arn # S3 → SQS → Pipe (SQS is a valid pipe source) target = aws_sfn_state_machine.pipeline.arn target_parameters { step_function_state_machine_parameters { invocation_type = "FIRE_AND_FORGET" # Express workflow; async start, response immediately } } filter_criteria { filter { pattern = jsonencode({ body = { detail = { object = { key = [{ suffix = ".jpg" }, { suffix = ".png" }, { suffix = ".webp" }] } } } }) } } }
Why Express Workflow here? Each file completes in under 30 seconds. At 100K uploads per day, Standard Workflow costs ~$2,500/month (state transitions @ $0.025/1000); Express costs ~$8/month. For pipelines that complete quickly and tolerate at-least-once semantics, Express is always the right choice. Standard is reserved for multi-day workflows or exactly-once business transactions.

DLQ Operations Runbook

A DLQ is not a graveyard — it is a holding queue for messages that need human or automated remediation. At big-tech scale you need three things: automated alarming (already done above), a replay mechanism, and a drilldown path. The AWS CLI snippet below covers routine DLQ operations:

# ── Inspect messages without deleting them ── aws sqs receive-message \ --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/img-pipeline-prod-transform-dlq \ --max-number-of-messages 10 \ --visibility-timeout 30 \ --attribute-names All \ --message-attribute-names All \ | jq '.Messages[] | {id:.MessageId, body:(.Body|fromjson), received:.Attributes.ApproximateReceiveCount}' # ── Replay ALL DLQ messages back to the source queue ── # (requires both queues to be in the same region/account) aws sqs start-message-move-task \ --source-arn arn:aws:sqs:us-east-1:123456789012:img-pipeline-prod-transform-dlq \ --destination-arn arn:aws:sqs:us-east-1:123456789012:img-pipeline-prod-transform-queue \ --max-number-of-messages-per-second 50 # throttle replay to avoid Lambda concurrency burst # Monitor move task aws sqs list-message-move-tasks \ --source-arn arn:aws:sqs:us-east-1:123456789012:img-pipeline-prod-transform-dlq # ── Purge DLQ after confirmed fix is deployed ── aws sqs purge-queue \ --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/img-pipeline-prod-transform-dlq # ── Find the X-Ray trace for a failed execution ── # Step Functions emits a traceId in execution events aws stepfunctions get-execution-history \ --execution-arn arn:aws:states:us-east-1:123456789012:execution:img-pipeline-prod-state-machine:abc123 \ --query 'events[?type==`ExecutionFailed`].executionFailedEventDetails'

End-to-End Observability

Distributed tracing is non-negotiable in an async pipeline. X-Ray propagates _X_AMZN_TRACE_ID through Lambda environment variables into Step Functions and down into SDK calls. Every DynamoDB PutItem and SNS Publish shows up as a trace segment. The key operational metric set for this pipeline:

  • Pipeline P99 latencyaws cloudwatch get-metric-statistics on ExecutionTime in the AWS/States namespace; alert if P99 > 45 seconds.
  • DLQ depthApproximateNumberOfMessagesVisible on both DLQs; alert on any non-zero value.
  • Lambda error rateErrors / Invocations per function; alert if > 1% over 5 minutes.
  • Transform concurrencyConcurrentExecutions for thumbnail/watermark Lambdas; alert if approaching the account limit (1,000 default) to request a quota increase proactively.
  • Cold-start ratio — custom EMF metric emitted by each Lambda: { "cold_start": 1 } when LAMBDA_TASK_ROOT is freshly initialized; track as a percentage of all invocations.
Structured logging across every Lambda: emit a JSON log line with {"level":"INFO","traceId":"...","executionName":"...","fileKey":"...","durationMs":120} at the end of every handler. A single CloudWatch Insights query then spans the entire pipeline: fields @timestamp, traceId, fileKey, durationMs | filter executionName like /img-pipeline/ | sort @timestamp desc.

Production Failure Modes and Mitigations

Senior engineers build systems with a failure taxonomy in mind before the first line of code ships. For this pipeline:

  • S3 event duplication — S3 delivers at-least-once. The DynamoDB PutItem uses a condition expression attribute_not_exists(fileKey); a duplicate execution fails the persist step with ConditionalCheckFailedException, which the state machine catches and routes to a Success path (idempotent by design).
  • Lambda cold-start cascades — a sudden burst (e.g., a batch upload of 5,000 files) will exhaust the burst concurrency limit. Mitigate with EventBridge Pipes maximumConcurrency throttling at the pipe source and Provisioned Concurrency on the validate function (it is the bottleneck — all files go through it).
  • DLQ accumulation without replay — if a bug ships and 50K messages land in the DLQ before the alarm fires, a naive replay at full speed will trigger a Lambda concurrency burst. Always replay with --max-number-of-messages-per-second set to a rate your downstream can absorb without throttling.
  • Step Functions Express history loss — Express Workflows do not retain execution history in the console beyond 90 days. logging_configuration { level = "ALL" } in Terraform persists every state transition to CloudWatch Logs permanently (subject to your log group retention). Never deploy Express Workflows without this setting.
The silent retry trap: Lambda has a built-in async retry policy (2 retries with exponential backoff). Step Functions also has its own Retry block. If both are active on the same function, a transient failure can trigger up to 6 total invocations before landing in the DLQ — and your metrics show 6x the expected invocation count. For Step Functions-orchestrated Lambdas, set the Lambda function's EventInvokeConfig to zero retries (MaximumRetryAttempts: 0) and let the state machine own all retry logic.

Putting It All Together

Deploy the pipeline end-to-end with a single Terraform apply, then smoke-test with a real upload:

# 1. Apply infrastructure cd infra/ terraform init -backend-config="bucket=my-tf-state" -backend-config="key=img-pipeline/prod.tfstate" terraform plan -var="env=prod" -var="pagerduty_sns_arn=arn:aws:sns:..." terraform apply -auto-approve -var="env=prod" -var="pagerduty_sns_arn=arn:aws:sns:..." # 2. Upload a test image and watch the pipeline execute aws s3 cp ./test-image.jpg s3://img-pipeline-prod-ingest/uploads/test-image.jpg # 3. Poll the state machine for the execution (pipe started it immediately) aws stepfunctions list-executions \ --state-machine-arn arn:aws:states:us-east-1:123456789012:stateMachine:img-pipeline-prod-state-machine \ --status-filter RUNNING \ --query 'executions[0].executionArn' --output text \ | xargs aws stepfunctions describe-execution --execution-arn # 4. Confirm DynamoDB record was written aws dynamodb get-item \ --table-name img-pipeline-prod-assets \ --key '{"fileKey": {"S": "uploads/test-image.jpg"}}' \ | jq '.Item' # 5. Confirm no DLQ messages aws sqs get-queue-attributes \ --queue-url https://sqs.us-east-1.amazonaws.com/123456789012/img-pipeline-prod-transform-dlq \ --attribute-names ApproximateNumberOfMessagesVisible \ | jq '.Attributes.ApproximateNumberOfMessagesVisible' # Expected: "0"

This pipeline pattern — S3 trigger, EventBridge Pipes filtering, Express Workflow orchestration, parallel Lambda transforms, direct SDK integrations for DynamoDB writes, SNS fan-out for downstream coupling, DLQs at every failure boundary, X-Ray traces connecting all dots, Terraform managing all of it — is the production-grade skeleton used in enterprise image, video, and document pipelines across the industry. Adapt the transform Lambdas for your domain; the operational skeleton remains identical.