Why Patterns Matter
Automations should be:
- Reliable (work consistently)
- Recoverable (handle failures)
- Observable (know what's happening)
- Maintainable (easy to update)
Core Patterns
Retry with Backoff
def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except TransientError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
time.sleep(delay)
Idempotency
Safe to run multiple times:
def process_order(order_id):
# Check if already processed
if is_processed(order_id):
return get_existing_result(order_id)
# Process and mark complete
result = do_processing(order_id)
mark_processed(order_id)
return result
Circuit Breaker
Stop calling failing services:
class CircuitBreaker:
def __init__(self, threshold=5, timeout=60):
self.failures = 0
self.threshold = threshold
self.timeout = timeout
self.open_time = None
def call(self, func):
if self.is_open():
raise CircuitOpenError()
try:
result = func()
self.reset()
return result
except Exception:
self.failures += 1
if self.failures >= self.threshold:
self.open_time = time.time()
raise
def is_open(self):
if self.open_time is None:
return False
if time.time() - self.open_time > self.timeout:
self.reset()
return False
return True
Checkpointing
Save progress for resumability:
def process_large_job(items, checkpoint_file):
# Load checkpoint
processed = load_checkpoint(checkpoint_file)
for item in items:
if item.id in processed:
continue
process(item)
# Save checkpoint
processed.add(item.id)
save_checkpoint(checkpoint_file, processed)
Workflow Patterns
Pipeline
Sequential processing:
def pipeline(data):
data = step1_validate(data)
data = step2_transform(data)
data = step3_enrich(data)
data = step4_output(data)
return data
Fan-Out/Fan-In
Parallel then aggregate:
async def fan_out_fan_in(items):
# Fan out
tasks = [process_async(item) for item in items]
# Fan in
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter errors
successes = [r for r in results if not isinstance(r, Exception)]
return successes
Saga Pattern
Coordinated transactions with compensation:
def book_trip(user, flight, hotel):
flight_booking = None
hotel_booking = None
try:
flight_booking = book_flight(user, flight)
hotel_booking = book_hotel(user, hotel)
return Success(flight_booking, hotel_booking)
except Exception as e:
# Compensate
if flight_booking:
cancel_flight(flight_booking)
if hotel_booking:
cancel_hotel(hotel_booking)
raise
Error Handling
Fail Fast
Check early, fail clearly:
def process(data):
if not data:
raise ValueError("Data cannot be empty")
if not data.get('required_field'):
raise ValueError("Missing required_field")
# Now safe to proceed
Graceful Degradation
Fallback when things fail:
def get_user_data(user_id):
try:
return fetch_from_primary(user_id)
except PrimaryUnavailable:
return fetch_from_cache(user_id)
Dead Letter Queue
Handle unprocessable items:
def process_with_dlq(item, dlq):
try:
result = process(item)
except UnprocessableError:
dlq.add(item)
return None
return result
Observability
Logging
def process(data):
logger.info(f"Starting process for {data.id}")
try:
result = do_work(data)
logger.info(f"Completed {data.id}: {result}")
return result
except Exception as e:
logger.error(f"Failed {data.id}: {e}", exc_info=True)
raise
Metrics
def process(data):
start = time.time()
try:
result = do_work(data)
metrics.increment("process.success")
except Exception:
metrics.increment("process.failure")
raise
finally:
metrics.timing("process.duration", time.time() - start)
Health Checks
def health_check():
checks = {
"database": check_database(),
"cache": check_cache(),
"external_api": check_api()
}
all_healthy = all(checks.values())
return {"healthy": all_healthy, "checks": checks}
Scheduling
Cron-Style
For regular intervals:
# Run daily at 9am
schedule: "0 9 * * *"
Event-Driven
React to triggers:
def on_file_created(event):
if event.path.endswith('.csv'):
process_csv(event.path)
Batch Windows
Process in time windows:
def batch_process():
items = get_pending_items(limit=100)
for item in items:
process(item)
mark_processed(item)
Testing Automations
Mock External Services
def test_process():
with patch('module.external_api') as mock_api:
mock_api.return_value = {"status": "ok"}
result = process(data)
assert result.success
Test Failure Cases
def test_retry():
call_count = 0
def failing_func():
nonlocal call_count
call_count += 1
if call_count < 3:
raise TransientError()
return "success"
result = retry_with_backoff(failing_func)
assert call_count == 3
Conclusion
Reliable automations use:
- Retries with backoff
- Idempotent operations
- Checkpointing for recovery
- Good error handling
- Proper observability
Build for failure, and failures become manageable.
Next: Message Queues - Async communication patterns