Python for DevOps Automation

Concurrency for Ops: Threads & Async

18 min Lesson 8 of 28

Concurrency for Ops: Threads & Async

A DevOps script that checks the health of 200 services sequentially waits for each HTTP response before firing the next request. On a 100 ms average latency, that is 20 seconds of wall-clock time — almost entirely spent blocked on network I/O, with your CPU idle. Add concurrency and the same work takes under a second. This lesson teaches the two concurrency models Python gives you, when each is the right choice, and the production pitfalls that trip even experienced engineers.

The Two Models: Threading vs. asyncio

Python's Global Interpreter Lock (GIL) means that at any instant only one thread executes Python bytecode. For CPU-bound work (number crunching, compression) this makes threads useless for parallelism — use multiprocessing or subprocesses instead. For I/O-bound work — the overwhelming majority of ops tasks (HTTP calls, SDK calls, DNS lookups, subprocess waits) — the GIL releases while the thread blocks on I/O, so multiple threads run concurrently in practice.

  • Threading (concurrent.futures.ThreadPoolExecutor): Each unit of work runs in its own OS thread. Simple to adopt because existing synchronous code (boto3 calls, requests.get) works with zero changes. Best for fan-out work where you call many independent I/O operations and collect the results.
  • asyncio: A single-threaded event loop switches between coroutines whenever one is awaiting I/O. Lower overhead per task (no OS thread stack), but requires async-native libraries (aiohttp, aiobotocore). Best when you have thousands of concurrent connections or need fine-grained cancellation and timeouts.
Key rule at big tech: For most ops scripts that fan out across tens to a few hundred services or AWS resources, ThreadPoolExecutor with a bounded pool (8–32 workers) is the right default. It is readable, debuggable, and requires no async-aware libraries. Reach for asyncio when you need to sustain thousands of simultaneous open connections — a load generator, a real-time log tailer, or a WebSocket-based monitoring agent.

Parallel API Calls with ThreadPoolExecutor

The canonical pattern: submit all tasks, collect futures, iterate over completions. The as_completed function yields futures as they finish rather than in submission order, letting you process results the moment they arrive.

import concurrent.futures import requests import logging import time logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) log = logging.getLogger(__name__) ENDPOINTS = [ "https://api.example.com/health", "https://api.example.com/metrics", "https://auth.example.com/health", "https://payments.example.com/health", "https://notifications.example.com/health", ] def check_health(url: str, timeout: int = 5) -> dict: """Return a health result dict; never raises — callers rely on this.""" start = time.monotonic() try: resp = requests.get(url, timeout=timeout) elapsed = time.monotonic() - start return { "url": url, "status": resp.status_code, "ok": resp.status_code == 200, "latency_ms": round(elapsed * 1000), } except requests.exceptions.Timeout: return {"url": url, "ok": False, "error": "timeout"} except requests.exceptions.RequestException as exc: return {"url": url, "ok": False, "error": str(exc)} def fan_out_health_check(urls: list[str], workers: int = 10) -> list[dict]: results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool: # Submit all tasks immediately — returns Future objects future_to_url = {pool.submit(check_health, url): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: result = future.result() results.append(result) status = "OK" if result.get("ok") else "FAIL" log.info("%s %s (%s ms)", status, url, result.get("latency_ms", "-")) except Exception as exc: # future.result() re-raises if check_health itself raised log.error("Unexpected error checking %s: %s", url, exc) results.append({"url": url, "ok": False, "error": str(exc)}) return results if __name__ == "__main__": results = fan_out_health_check(ENDPOINTS) failed = [r for r in results if not r["ok"]] if failed: log.error("%d endpoint(s) unhealthy: %s", len(failed), failed) raise SystemExit(1) log.info("All %d endpoints healthy", len(results))
Pro practice: Always catch exceptions inside the worker function and return a structured error dict rather than letting exceptions propagate through the future. This way your fan-out loop never crashes mid-flight when one of fifty targets is unreachable. Reserve future.result() exception catching as a safety net for truly unexpected bugs, not expected network errors.

Choosing the Right Pool Size

Setting max_workers is a tuning decision, not a "bigger is better" dial. Too small and you leave parallelism on the table. Too large and you exhaust file descriptors, hit per-host TCP connection limits, or get rate-limited by the remote API.

  • For external HTTP APIs: Most SaaS APIs enforce per-client rate limits (e.g., 100 req/s). Start at 10–20 workers and instrument the 429 rate. Back off exponentially on 429s rather than adding more threads.
  • For AWS SDK calls (boto3): The default connection pool per Session is 10. With 50 threads sharing one session you will get ConnectionError pool exhaustion. Either set max_pool_connections in botocore.config.Config to match your worker count, or create one session per thread using threading.local().
  • For internal microservices / health checks: Workers can be 50–100 if targets are on a low-latency internal network and you own the target services.
Thread pool fan-out: submitting tasks and collecting results Main Thread submit() all tasks ThreadPoolExecutor max_workers=10 Worker Thread 1 Worker Thread 2 Worker Thread 3 ... up to N submit Service A Service B Service C Service D (slow) as_completed yields as done last to finish
ThreadPoolExecutor fan-out: all tasks submit instantly; as_completed yields results as each worker finishes, not in submission order.

asyncio for High-Fan-Out Ops Work

When your fan-out target count reaches the hundreds-to-thousands range, OS thread overhead becomes meaningful. Each thread consumes ~8 MB of stack by default; 1,000 threads = 8 GB RAM before you write a single byte of application data. asyncio coroutines are cheap — thousands coexist in a few megabytes. The trade-off: every library in the call chain must be async-native.

import asyncio import aiohttp import logging log = logging.getLogger(__name__) ENDPOINTS = [f"https://internal-svc-{i}.prod.example.com/health" for i in range(1, 201)] async def check_one(session: aiohttp.ClientSession, url: str) -> dict: try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp: return {"url": url, "ok": resp.status == 200, "status": resp.status} except asyncio.TimeoutError: return {"url": url, "ok": False, "error": "timeout"} except aiohttp.ClientError as exc: return {"url": url, "ok": False, "error": str(exc)} async def fan_out(urls: list[str], concurrency: int = 50) -> list[dict]: # Semaphore caps simultaneous open connections regardless of how many tasks exist sem = asyncio.Semaphore(concurrency) async def guarded(url: str) -> dict: async with sem: return await check_one(session, url) connector = aiohttp.TCPConnector(limit=concurrency) async with aiohttp.ClientSession(connector=connector) as session: tasks = [asyncio.create_task(guarded(url)) for url in urls] return await asyncio.gather(*tasks) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") results = asyncio.run(fan_out(ENDPOINTS)) failed = [r for r in results if not r["ok"]] log.info("Checked %d endpoints. Failures: %d", len(results), len(failed)) if failed: for f in failed: log.error("FAIL %s: %s", f["url"], f.get("error", f.get("status"))) raise SystemExit(1)
Production pitfall — unbounded gather: asyncio.gather(*[task for task in 10_000_tasks]) creates all 10,000 coroutines simultaneously. Even though coroutines are lightweight, the remote services see a thundering herd of connections. Always throttle with asyncio.Semaphore and set a matching limit on TCPConnector. Without this, you will hit kernel-level EMFILE (too many open files) or get banned by the upstream service.

Mixing boto3 with Thread Pools

boto3 is not async-native, so the correct pattern for parallel AWS API calls is ThreadPoolExecutor. One critical detail: boto3 Session objects are not thread-safe. A single shared session with a thread-local client is the standard solution used inside AWS tooling itself.

import boto3 import concurrent.futures import threading import logging log = logging.getLogger(__name__) # Thread-local storage: each thread gets its own EC2 client _local = threading.local() def get_ec2_client(region: str) -> boto3.client: if not hasattr(_local, "clients"): _local.clients = {} if region not in _local.clients: # boto3.Session() is safe to create per-thread; credentials come from env/IAM role session = boto3.Session() _local.clients[region] = session.client( "ec2", region_name=region, config=boto3.session.Config(max_pool_connections=1), ) return _local.clients[region] def get_region_instance_count(region: str) -> dict: try: ec2 = get_ec2_client(region) paginator = ec2.get_paginator("describe_instances") total = sum( len(r["Instances"]) for page in paginator.paginate(Filters=[{"Name": "instance-state-name", "Values": ["running"]}]) for r in page["Reservations"] ) return {"region": region, "running": total} except Exception as exc: log.error("Failed to query %s: %s", region, exc) return {"region": region, "error": str(exc)} REGIONS = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1", "ap-northeast-1"] with concurrent.futures.ThreadPoolExecutor(max_workers=len(REGIONS)) as pool: results = list(pool.map(get_region_instance_count, REGIONS)) for r in results: log.info("Region %s: %s running instances", r["region"], r.get("running", "ERROR"))

Timeouts and Cancellation

Any concurrent ops script must have a wall-clock deadline. A single slow or hung target must not block the entire run indefinitely. The patterns differ between the two models:

  • ThreadPoolExecutor: Use concurrent.futures.wait(fs, timeout=30) to get a (done, not_done) pair after 30 seconds. Call future.cancel() on the not_done set — note that cancellation only works for tasks that have not yet started executing. For tasks already running, the only safe approach is cooperative: pass a threading Event to the worker and check it periodically.
  • asyncio: Wrap any coroutine with asyncio.wait_for(coro, timeout=5.0). Raises asyncio.TimeoutError and cleanly cancels the underlying task. For a batch: asyncio.gather(*tasks, return_exceptions=True) ensures all results (including exceptions) are collected without one failure aborting the rest.
Pro practice: In production alerting scripts, set an outer script-level deadline (signal.alarm on Linux or asyncio.wait_for around the entire fan_out call) in addition to per-request timeouts. This prevents the script from running forever if your concurrency primitive itself has a bug — a scenario more common than you might expect during network partitions.

When to Avoid Concurrency

Concurrency adds debugging surface. A sequential script with clear logging is easier to operate than a concurrent one. Before reaching for threads or asyncio, ask:

  • Is the total sequential runtime actually a problem? If a nightly batch finishes in 4 minutes, adding concurrency for the sake of it introduces risk with no user-visible benefit.
  • Does the target service have rate limits that make concurrency counterproductive? Hitting a 10 req/s limit with 50 threads will just produce a flood of 429s and retries.
  • Is the operation stateful in a way that makes concurrent mutations dangerous? Concurrent writes to the same S3 key, the same database row, or the same Kubernetes resource require distributed locking — a much harder problem than the one you started with.
The production default: At companies like Google and Stripe, ops tooling typically uses thread pools with modest worker counts (8–16) for cross-service fan-out, and asyncio only in high-throughput pipeline components (streaming log processors, alert evaluation engines). The choice is driven by measurable latency requirements, not aesthetics. Benchmark before you optimize.