Implementing Webhook Idempotency
Implementing Webhook Idempotency
Learn how to implement idempotency for webhooks to prevent duplicate processing.
Why Idempotency Matters
Sports Stack webhooks use at-least-once delivery semantics, meaning:
- Webhooks may be delivered multiple times
- Network retries can cause duplicates
- You must handle duplicates gracefully
The Solution: Use the Hash Field
Every webhook payload includes a hash field that uniquely identifies the change:
{
"event": {
"hash": "a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
"common_model_id": "550e8400-e29b-41d4-a716-446655440000"
}
}Use this hash to detect and skip duplicate webhooks.
Implementation Patterns
Pattern 1: In-Memory Set (Development)
Use Case: Development and testing
processed_hashes = set()
def handle_webhook(payload):
event_hash = payload['event']['hash']
if event_hash in processed_hashes:
return Response(status=200) # Already processed
# Process webhook
process_entity_update(payload)
# Mark as processed
processed_hashes.add(event_hash)
return Response(status=200)Limitations:
- Lost on server restart
- Not shared across instances
- Memory usage grows over time
Pattern 2: Database Table (Production)
Use Case: Production applications
import sqlite3
from datetime import datetime, timedelta
def setup_database():
conn = sqlite3.connect('webhooks.db')
conn.execute('''
CREATE TABLE IF NOT EXISTS processed_webhooks (
hash TEXT PRIMARY KEY,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
return conn
def is_already_processed(conn, event_hash):
cursor = conn.execute(
'SELECT 1 FROM processed_webhooks WHERE hash = ?',
(event_hash,)
)
return cursor.fetchone() is not None
def mark_as_processed(conn, event_hash):
conn.execute(
'INSERT OR IGNORE INTO processed_webhooks (hash) VALUES (?)',
(event_hash,)
)
conn.commit()
def cleanup_old_hashes(conn, days=30):
"""Remove hashes older than N days"""
cutoff = datetime.now() - timedelta(days=days)
conn.execute(
'DELETE FROM processed_webhooks WHERE processed_at < ?',
(cutoff,)
)
conn.commit()Pattern 3: Redis (Scalable)
Use Case: High-volume applications, multiple instances
import redis
from datetime import timedelta
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def is_already_processed(event_hash):
return redis_client.exists(f"webhook:{event_hash}")
def mark_as_processed(event_hash, ttl_days=30):
ttl_seconds = timedelta(days=ttl_days).total_seconds()
redis_client.setex(
f"webhook:{event_hash}",
int(ttl_seconds),
"1"
)Benefits:
- Fast lookups
- Automatic expiration
- Shared across instances
- Memory efficient
Pattern 4: PostgreSQL (Recommended)
Use Case: Production applications with PostgreSQL
import psycopg2
from datetime import datetime, timedelta
def is_already_processed(conn, event_hash):
cursor = conn.cursor()
cursor.execute(
'SELECT 1 FROM processed_webhooks WHERE hash = %s',
(event_hash,)
)
return cursor.fetchone() is not None
def mark_as_processed(conn, event_hash):
cursor = conn.cursor()
cursor.execute(
'''
INSERT INTO processed_webhooks (hash, processed_at)
VALUES (%s, %s)
ON CONFLICT (hash) DO NOTHING
''',
(event_hash, datetime.now())
)
conn.commit()
# Database schema
CREATE TABLE processed_webhooks (
hash VARCHAR(64) PRIMARY KEY,
processed_at TIMESTAMP DEFAULT NOW(),
resource_type VARCHAR(50),
common_model_id UUID
);
CREATE INDEX idx_processed_webhooks_processed_at
ON processed_webhooks(processed_at);Complete Example: Flask with PostgreSQL
from flask import Flask, request, Response
import psycopg2
import hmac
import hashlib
import json
import os
app = Flask(__name__)
SHARED_SECRET = os.environ.get('WEBHOOK_SECRET')
def get_db_connection():
return psycopg2.connect(
host=os.environ.get('DB_HOST', 'localhost'),
database=os.environ.get('DB_NAME', 'webhooks'),
user=os.environ.get('DB_USER', 'postgres'),
password=os.environ.get('DB_PASSWORD')
)
def verify_signature(payload_body, signature_header, secret):
if isinstance(payload_body, str):
payload_body = payload_body.encode('utf-8')
expected_signature = hmac.new(
secret.encode('utf-8'),
payload_body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_signature, signature_header)
@app.route('/webhook', methods=['POST'])
def webhook_handler():
# Verify signature
signature = request.headers.get('X-SportsStack-Signature')
payload_body = request.data.decode('utf-8')
if not verify_signature(payload_body, signature, SHARED_SECRET):
return Response(status=401)
# Parse payload
payload = json.loads(payload_body)
event_hash = payload['event']['hash']
# Check idempotency
conn = get_db_connection()
try:
if is_already_processed(conn, event_hash):
return Response(status=200) # Already processed
# Process webhook
process_webhook(payload)
# Mark as processed
mark_as_processed(conn, event_hash)
return Response(status=200)
finally:
conn.close()
def is_already_processed(conn, event_hash):
cursor = conn.cursor()
cursor.execute(
'SELECT 1 FROM processed_webhooks WHERE hash = %s',
(event_hash,)
)
return cursor.fetchone() is not None
def mark_as_processed(conn, event_hash):
cursor = conn.cursor()
cursor.execute(
'''
INSERT INTO processed_webhooks (hash, processed_at)
VALUES (%s, NOW())
ON CONFLICT (hash) DO NOTHING
''',
(event_hash,)
)
conn.commit()
def process_webhook(payload):
# Your webhook processing logic
resource_type = payload['resource_type']
common_model_id = payload['event']['common_model_id']
# Fetch and update entity
update_entity(resource_type, common_model_id)Node.js Example
const express = require('express');
const crypto = require('crypto');
const { Pool } = require('pg');
const app = express();
const pool = new Pool({
host: process.env.DB_HOST,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD
});
async function isAlreadyProcessed(eventHash) {
const result = await pool.query(
'SELECT 1 FROM processed_webhooks WHERE hash = $1',
[eventHash]
);
return result.rows.length > 0;
}
async function markAsProcessed(eventHash) {
await pool.query(
'INSERT INTO processed_webhooks (hash, processed_at) VALUES ($1, NOW()) ON CONFLICT (hash) DO NOTHING',
[eventHash]
);
}
app.post('/webhook', express.raw({ type: 'application/json' }), async (req, res) => {
const signature = req.headers['x-sportsstack-signature'];
const payloadBody = req.body.toString();
const payload = JSON.parse(payloadBody);
const eventHash = payload.event.hash;
// Verify signature
if (!verifySignature(payloadBody, signature, process.env.WEBHOOK_SECRET)) {
return res.status(401).send('Invalid signature');
}
// Check idempotency
if (await isAlreadyProcessed(eventHash)) {
return res.status(200).json({ status: 'already_processed' });
}
// Process webhook
await processWebhook(payload);
// Mark as processed
await markAsProcessed(eventHash);
res.status(200).json({ status: 'ok' });
});Best Practices
1. Always Check Before Processing
# ✅ Good: Check first
if is_already_processed(event_hash):
return Response(status=200)
process_webhook(payload)
mark_as_processed(event_hash)
# ❌ Bad: Process then check
process_webhook(payload) # May process duplicate!
if not is_already_processed(event_hash):
mark_as_processed(event_hash)2. Use Atomic Operations
# ✅ Good: Atomic insert
INSERT INTO processed_webhooks (hash)
VALUES ($1)
ON CONFLICT (hash) DO NOTHING
# ❌ Bad: Check then insert (race condition)
if not exists(hash):
insert(hash) # Another request might insert between check and insert3. Clean Up Old Hashes
# Remove hashes older than 30 days
DELETE FROM processed_webhooks
WHERE processed_at < NOW() - INTERVAL '30 days'4. Handle Database Errors
try:
mark_as_processed(conn, event_hash)
except psycopg2.IntegrityError:
# Hash already exists (another request processed it)
pass # This is OK, just means it was already processedTesting Idempotency
Test Duplicate Delivery
def test_idempotency():
payload = {
"event": {
"hash": "test-hash-123",
"common_model_id": "test-id"
}
}
# First delivery
response1 = handle_webhook(payload)
assert response1.status_code == 200
# Duplicate delivery (same hash)
response2 = handle_webhook(payload)
assert response2.status_code == 200
# Verify only processed once
assert get_processing_count("test-hash-123") == 1Related Documentation
- Webhooks Guide - Complete webhook guide
- Setting Up Webhooks in Python - Python setup
- Webhook Payload Schema - Payload structure
Updated 29 days ago
