02. boto3, poll loops, batch operations — the SDK surface¶
~10 min read. The day-to-day SQS surface in Python: send, receive, delete, batch operations, queue management. The patterns that ship.
Builds on: 01-visibility-timeout-and-fifo-internals.md.
The previous chapter explained delivery. This chapter is the boto3 surface you write daily.
1) Creating and configuring queues¶
import boto3
sqs = boto3.client('sqs', region_name='ap-south-1')
response = sqs.create_queue(
QueueName='my-orders',
Attributes={
'VisibilityTimeout': '60',
'MessageRetentionPeriod': '345600', # 4 days
'ReceiveMessageWaitTimeSeconds': '20', # long polling default
'RedrivePolicy': json.dumps({
'maxReceiveCount': '5',
'deadLetterTargetArn': 'arn:aws:sqs:ap-south-1:123456789012:my-orders-dlq',
}),
},
)
queue_url = response['QueueUrl']
Key attributes:
VisibilityTimeout— default for all receives. Override per-receive if needed.MessageRetentionPeriod— how long messages stay in the queue (60s to 14 days). Default 4 days.ReceiveMessageWaitTimeSeconds— queue-level long-polling default; consumers don't need to set per-call.RedrivePolicy— aftermaxReceiveCountreceives without delete, route to DLQ.
In practice, queues are usually created via Terraform/CloudFormation, not at runtime. The boto3 create call is for development and tests.
2) Sending — single and batch¶
Single send:
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({'order_id': 1234, 'action': 'process'}),
MessageAttributes={
'priority': {'StringValue': 'high', 'DataType': 'String'},
},
)
Batch send (up to 10):
sqs.send_message_batch(
QueueUrl=queue_url,
Entries=[
{'Id': f'msg-{i}', 'MessageBody': json.dumps({'order_id': i})}
for i in range(10)
],
)
Batch returns per-message success/failure; check Failed in the response:
response = sqs.send_message_batch(QueueUrl=queue_url, Entries=batch)
if response.get('Failed'):
for failed in response['Failed']:
log.error("Failed to send %s: %s", failed['Id'], failed['Message'])
# Retry or DLQ this one
Batching cuts API costs by 10× for high-volume producers. For low-volume producers (handful per second), single sends are fine.
For FIFO queues, batch entries within the same batch must have the same MessageGroupId only if order across them matters; SQS does not enforce ordering between batches with the same group ID.
3) Receiving — the canonical poll loop¶
def consume_loop(queue_url, handler):
while True:
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
AttributeNames=['ApproximateReceiveCount', 'SentTimestamp'],
MessageAttributeNames=['All'],
)
for message in response.get('Messages', []):
try:
handler(message)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle'],
)
except Exception:
log.exception("Handler failed for message %s", message['MessageId'])
# Don't delete — visibility timeout will redrive
Notable parameters:
WaitTimeSeconds=20— long polling. Almost always 20.MaxNumberOfMessages=10— batch receive. Max is 10.AttributeNames=['ApproximateReceiveCount']— useful for poison-message detection.MessageAttributeNames=['All']— fetch any user-set attributes.
Without long polling and batching, you've left throughput on the table.
4) Batch delete¶
sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=[
{'Id': msg['MessageId'], 'ReceiptHandle': msg['ReceiptHandle']}
for msg in successfully_processed
],
)
Batch delete after processing a batch of messages. Reduces API calls. Check the response's Failed list for any messages that didn't delete (their receipt handle may have expired if processing took too long).
5) Parallel processing within a batch¶
A naive consumer processes messages one at a time:
A faster consumer processes the batch in parallel:
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {pool.submit(handler, m): m for m in messages}
successful = []
for future in as_completed(futures):
message = futures[future]
try:
future.result()
successful.append(message)
except Exception:
log.exception("Handler failed for %s", message['MessageId'])
if successful:
sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=[{'Id': m['MessageId'], 'ReceiptHandle': m['ReceiptHandle']}
for m in successful],
)
The thread pool size depends on the workload — I/O-bound tasks can have 50+ threads; CPU-bound tasks should be near CPU count.
6) Visibility timeout extension — the heartbeat¶
For tasks that run long:
def with_heartbeat(message, queue_url, work_fn, heartbeat_interval=30, extension=60):
stop = threading.Event()
def heartbeat():
while not stop.wait(heartbeat_interval):
try:
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle'],
VisibilityTimeout=extension,
)
except Exception:
log.exception("Heartbeat failed for %s", message['MessageId'])
hb_thread = threading.Thread(target=heartbeat, daemon=True)
hb_thread.start()
try:
return work_fn(message)
finally:
stop.set()
hb_thread.join(timeout=5)
Wrap any long-running handler in with_heartbeat. The heartbeat thread extends visibility every 30 seconds while the work runs. Stops cleanly when work returns or raises.
7) FIFO queues — sending and receiving¶
sqs.send_message(
QueueUrl=fifo_queue_url,
MessageBody=json.dumps(payload),
MessageGroupId=f'customer-{customer_id}',
MessageDeduplicationId=str(event_id),
)
Receive is the same as Standard, with one quirk: SQS will not deliver another message from the same MessageGroupId until the previous one is deleted (or its visibility timeout expires). This enforces ordering — a consumer can't accidentally process the second message before the first.
For multi-consumer FIFO, scale by group: many groups in parallel, one consumer at a time per group. Throughput is per-group, not global.
8) Sending from Django / web handlers¶
def order_submit(request):
order = Order.objects.create(...)
sqs.send_message(
QueueUrl=settings.ORDERS_QUEUE_URL,
MessageBody=json.dumps({'order_id': order.id}),
)
return JsonResponse({'order_id': order.id})
A common bug: the request returns success, but the SQS send failed silently. Three defences:
- Wrap in try/except. Catch send failures; respond with 500 or retry.
- Send before responding. Send after the database commit but before the HTTP response.
- Use transactional outbox. Write the message to a database table within the same transaction as the model; a separate process drains the table to SQS. Guarantees no message lost even if the SQS API is unreachable.
# Transactional outbox pattern
with transaction.atomic():
order = Order.objects.create(...)
OutboxMessage.objects.create(
queue='orders',
body=json.dumps({'order_id': order.id}),
)
# A separate process reads OutboxMessage rows, sends to SQS, deletes.
This pattern is the standard for "must-send-this-message" workflows in financial and regulated contexts.
9) IAM — who can publish, who can consume¶
Producer IAM policy:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["sqs:SendMessage", "sqs:SendMessageBatch"],
"Resource": "arn:aws:sqs:ap-south-1:123456789012:my-orders"
}]
}
Consumer IAM policy:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage", "sqs:DeleteMessage",
"sqs:DeleteMessageBatch", "sqs:ChangeMessageVisibility",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:ap-south-1:123456789012:my-orders"
}]
}
Use IAM roles attached to the EC2/ECS/EKS workload, not access keys in environment variables. For local development, use AWS SSO or AWS Vault.
10) The threaded example — a production-ready worker¶
import threading, signal, time, json, logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
log = logging.getLogger(__name__)
sqs = boto3.client('sqs')
class SQSWorker:
def __init__(self, queue_url, handler, max_workers=10, heartbeat=30, extension=60):
self.queue_url = queue_url
self.handler = handler
self.max_workers = max_workers
self.heartbeat = heartbeat
self.extension = extension
self.shutdown = threading.Event()
def start(self):
signal.signal(signal.SIGTERM, lambda s, f: self.shutdown.set())
signal.signal(signal.SIGINT, lambda s, f: self.shutdown.set())
log.info("SQS worker starting on %s", self.queue_url)
while not self.shutdown.is_set():
try:
self.poll_once()
except Exception:
log.exception("Poll loop error")
time.sleep(5)
log.info("SQS worker shutting down")
def poll_once(self):
response = sqs.receive_message(
QueueUrl=self.queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
AttributeNames=['ApproximateReceiveCount'],
)
messages = response.get('Messages', [])
if not messages:
return
with ThreadPoolExecutor(max_workers=self.max_workers) as pool:
futures = {pool.submit(self._handle_with_heartbeat, m): m for m in messages}
to_delete = []
for future in as_completed(futures):
msg = futures[future]
try:
future.result()
to_delete.append(msg)
except Exception:
log.exception("Handler failed: %s", msg['MessageId'])
if to_delete:
self._delete_batch(to_delete)
def _handle_with_heartbeat(self, message):
stop = threading.Event()
def hb():
while not stop.wait(self.heartbeat):
try:
sqs.change_message_visibility(
QueueUrl=self.queue_url,
ReceiptHandle=message['ReceiptHandle'],
VisibilityTimeout=self.extension,
)
except Exception:
log.exception("Heartbeat failed: %s", message['MessageId'])
threading.Thread(target=hb, daemon=True).start()
try:
self.handler(message)
finally:
stop.set()
def _delete_batch(self, messages):
sqs.delete_message_batch(
QueueUrl=self.queue_url,
Entries=[{'Id': m['MessageId'], 'ReceiptHandle': m['ReceiptHandle']}
for m in messages],
)
# Usage
def handler(message):
body = json.loads(message['Body'])
process_order(body['order_id'])
worker = SQSWorker(queue_url='https://...', handler=handler, max_workers=10)
worker.start()
~80 lines. Handles long polling, batching, parallel work, heartbeat extension, graceful shutdown, batch delete. This is what shows up in production teams' shared libraries.
Operational signals¶
Healthy. Receive rate matches send rate; receive_count near 1 for most messages; delete rate matches receive rate.
First degrading metric. ApproximateAgeOfOldestMessage climbing — backlog growing.
Misleading metric. ReceiveMessage API call count — includes empty receives (irrelevant without long polling).
Expert graph. Receive batch size distribution (should be near 10 for healthy consumers) + receive_count distribution per message.
Where this appears in production¶
- AWS samples — official Python SDK examples follow the long-poll-then-batch pattern.
- Netflix internal tooling — heartbeat extension is the standard pattern for long video-processing tasks.
- Stripe (some internal Python tools) — transactional outbox for must-deliver events.
- A Bengaluru fintech — outbox pattern for payment events; SQS sends never block a database commit.
- A Mumbai logistics SaaS —
SQSWorker-style class shared across services; battle-tested. - A Goa-based AI startup — heartbeat for ML inference tasks (5-30 second latency variance).
- A Pune travel SaaS — batch send for analytics events; 10× cost reduction over single sends.
Recall / checkpoint¶
- What does
RedrivePolicydo? - How does batch send cut cost?
- When does parallel processing in a batch help?
- What is the heartbeat pattern and when do you need it?
- What is the transactional outbox and what does it solve?
- What IAM permissions does a producer need vs. a consumer?
Interview Q&A¶
Q1. The team's SQS consumer is processing 5 messages per second; the queue is growing. Walk through optimisations.
Three quick wins: (1) long polling — WaitTimeSeconds=20 — fewer empty receives; (2) batch receive — MaxNumberOfMessages=10 — fewer round trips; (3) parallel processing within the batch — thread pool with N workers per consumer. After these three changes, the same consumer typically handles 50+ msgs/sec. Then scale horizontally if needed. Common wrong answer to avoid: "scale workers" — optimise the single worker first; horizontal scaling amplifies a slow consumer.
Q2. A team needs guaranteed message delivery after a Django save. Walk through the transactional outbox.
The naive pattern (save model, then send to SQS) has two failure modes: SQS API fails after the save (no message), or the save's transaction rolls back after the SQS send (orphan message). Fix: within the same transaction, write to an OutboxMessage table. A separate worker reads pending outbox rows, sends to SQS, deletes the row. Atomicity: the outbox write is in the same transaction as the model; either both happen or neither. The worker is idempotent (uses message ID as SQS dedup token). Common wrong answer to avoid: "send before save" — risks sending for a save that never committed.
Q3. Walk through implementing heartbeat for a 5-minute task.
A background thread, started before the work begins, calls ChangeMessageVisibility periodically. Interval: a quarter to a third of the desired extension; for a 60-second extension, heartbeat every 20-30s. The extension cumulative cap is 12 hours from initial receive. Stop the heartbeat thread cleanly when work finishes or raises. The pattern is ~15 lines of code; wrap your handler with it. Common wrong answer to avoid: "set initial visibility to 5 minutes" — works for happy path, but redrive on consumer crash takes 5 minutes; heartbeat makes failure recovery faster.
Q4. The team enabled batch send and 5% of messages are reported failed. Walk through the response.
Batch send returns per-message success/failure; the failures aren't exceptions, they're items in the response's Failed list. The team must inspect the response. Common failure reasons: message too large, malformed FIFO group ID, throttling. Fix: log every failure, retry the failed entries (possibly with backoff), surface persistent failures to monitoring. The 5% rate is high — investigate the per-failure error codes. Common wrong answer to avoid: "5% is normal" — most batch send failures are config or throttling issues, not steady-state.
Q5. The consumer's CPU is low but throughput is also low. Walk through the diagnosis. The consumer is I/O-bound — waiting on SQS receive or downstream calls. CPU stays low because most time is in network wait. Diagnosis: instrument the poll loop to measure time per stage (receive, handle, delete). If most time is in receive: short polling or no batch. If most time is in handle: handler is slow (downstream is slow or the work itself is slow). Fix per finding: switch to long polling + batch for receive issues; parallelise or speed up the handler for handle issues. Common wrong answer to avoid: "scale workers" — addresses neither bottleneck.
Q6. How do you handle a poison message — a message that fails repeatedly?
The redrive policy handles it automatically. Set maxReceiveCount=5 (or similar) on the queue; after 5 unsuccessful receives, SQS moves the message to the DLQ. The consumer doesn't need explicit poison-detection logic; the queue config does it. For visibility, check ApproximateReceiveCount on each received message; log warning if > some threshold. The DLQ should be monitored (alert on ingress) and have a triage process. Common wrong answer to avoid: "delete after N retries" — the redrive policy is structurally cleaner.
Operational memory¶
This chapter explained the SQS SDK surface: creating queues, sending (single and batch), receiving with long polling and batching, parallel processing, heartbeat, transactional outbox, IAM, and a production-ready worker class. The important idea is that the SDK calls are simple; the patterns wrapping them (heartbeat, outbox, batch handling) are what makes a production-grade consumer.
You learned to write a poll loop that handles long polling, batching, parallel work, heartbeat extension, and graceful shutdown — the canonical production worker shape. That solves the day-to-day; production gotchas come next.
Carry this diagnostic forward: when an SQS consumer underperforms, ask which SDK pattern is missing — polling mode, batching, parallelism, heartbeat, or scaling. Each has a known fix.
Remember:
- Long polling + batch receive + batch delete = throughput basics.
- Parallel processing within a batch = scale per consumer.
- Heartbeat for long tasks; transactional outbox for must-send semantics.
- IAM roles, not keys; least-privilege per producer/consumer.
Bridge. The SDK is set. Production has its own surface: DLQ tuning, throughput planning, cost surprises, encryption. The next chapter is that surface. → 03-deadletter-throughput-prod-gotchas.md