# Implementing Webhook Idempotency # Implementing Webhook Idempotency Learn how to implement idempotency for webhooks to prevent duplicate processing. ## Why Idempotency Matters Sports Stack webhooks use **at-least-once delivery** semantics, meaning: * Webhooks may be delivered multiple times * Network retries can cause duplicates * You must handle duplicates gracefully ## The Solution: Use the Hash Field Every webhook payload includes a `hash` field that uniquely identifies the change: ```json { "event": { "hash": "a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456", "common_model_id": "550e8400-e29b-41d4-a716-446655440000" } } ``` Use this hash to detect and skip duplicate webhooks. ## Implementation Patterns ### Pattern 1: In-Memory Set (Development) **Use Case**: Development and testing ```python processed_hashes = set() def handle_webhook(payload): event_hash = payload['event']['hash'] if event_hash in processed_hashes: return Response(status=200) # Already processed # Process webhook process_entity_update(payload) # Mark as processed processed_hashes.add(event_hash) return Response(status=200) ``` **Limitations**: * Lost on server restart * Not shared across instances * Memory usage grows over time ### Pattern 2: Database Table (Production) **Use Case**: Production applications ```python import sqlite3 from datetime import datetime, timedelta def setup_database(): conn = sqlite3.connect('webhooks.db') conn.execute(''' CREATE TABLE IF NOT EXISTS processed_webhooks ( hash TEXT PRIMARY KEY, processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() return conn def is_already_processed(conn, event_hash): cursor = conn.execute( 'SELECT 1 FROM processed_webhooks WHERE hash = ?', (event_hash,) ) return cursor.fetchone() is not None def mark_as_processed(conn, event_hash): conn.execute( 'INSERT OR IGNORE INTO processed_webhooks (hash) VALUES (?)', (event_hash,) ) conn.commit() def cleanup_old_hashes(conn, days=30): """Remove hashes older than N days""" cutoff = datetime.now() - timedelta(days=days) conn.execute( 'DELETE FROM processed_webhooks WHERE processed_at < ?', (cutoff,) ) conn.commit() ``` ### Pattern 3: Redis (Scalable) **Use Case**: High-volume applications, multiple instances ```python import redis from datetime import timedelta redis_client = redis.Redis(host='localhost', port=6379, db=0) def is_already_processed(event_hash): return redis_client.exists(f"webhook:{event_hash}") def mark_as_processed(event_hash, ttl_days=30): ttl_seconds = timedelta(days=ttl_days).total_seconds() redis_client.setex( f"webhook:{event_hash}", int(ttl_seconds), "1" ) ``` **Benefits**: * Fast lookups * Automatic expiration * Shared across instances * Memory efficient ### Pattern 4: PostgreSQL (Recommended) **Use Case**: Production applications with PostgreSQL ```python import psycopg2 from datetime import datetime, timedelta def is_already_processed(conn, event_hash): cursor = conn.cursor() cursor.execute( 'SELECT 1 FROM processed_webhooks WHERE hash = %s', (event_hash,) ) return cursor.fetchone() is not None def mark_as_processed(conn, event_hash): cursor = conn.cursor() cursor.execute( ''' INSERT INTO processed_webhooks (hash, processed_at) VALUES (%s, %s) ON CONFLICT (hash) DO NOTHING ''', (event_hash, datetime.now()) ) conn.commit() # Database schema CREATE TABLE processed_webhooks ( hash VARCHAR(64) PRIMARY KEY, processed_at TIMESTAMP DEFAULT NOW(), resource_type VARCHAR(50), common_model_id UUID ); CREATE INDEX idx_processed_webhooks_processed_at ON processed_webhooks(processed_at); ``` ## Complete Example: Flask with PostgreSQL ```python from flask import Flask, request, Response import psycopg2 import hmac import hashlib import json import os app = Flask(__name__) SHARED_SECRET = os.environ.get('WEBHOOK_SECRET') def get_db_connection(): return psycopg2.connect( host=os.environ.get('DB_HOST', 'localhost'), database=os.environ.get('DB_NAME', 'webhooks'), user=os.environ.get('DB_USER', 'postgres'), password=os.environ.get('DB_PASSWORD') ) def verify_signature(payload_body, signature_header, secret): if isinstance(payload_body, str): payload_body = payload_body.encode('utf-8') expected_signature = hmac.new( secret.encode('utf-8'), payload_body, hashlib.sha256 ).hexdigest() return hmac.compare_digest(expected_signature, signature_header) @app.route('/webhook', methods=['POST']) def webhook_handler(): # Verify signature signature = request.headers.get('X-SportsStack-Signature') payload_body = request.data.decode('utf-8') if not verify_signature(payload_body, signature, SHARED_SECRET): return Response(status=401) # Parse payload payload = json.loads(payload_body) event_hash = payload['event']['hash'] # Check idempotency conn = get_db_connection() try: if is_already_processed(conn, event_hash): return Response(status=200) # Already processed # Process webhook process_webhook(payload) # Mark as processed mark_as_processed(conn, event_hash) return Response(status=200) finally: conn.close() def is_already_processed(conn, event_hash): cursor = conn.cursor() cursor.execute( 'SELECT 1 FROM processed_webhooks WHERE hash = %s', (event_hash,) ) return cursor.fetchone() is not None def mark_as_processed(conn, event_hash): cursor = conn.cursor() cursor.execute( ''' INSERT INTO processed_webhooks (hash, processed_at) VALUES (%s, NOW()) ON CONFLICT (hash) DO NOTHING ''', (event_hash,) ) conn.commit() def process_webhook(payload): # Your webhook processing logic resource_type = payload['resource_type'] common_model_id = payload['event']['common_model_id'] # Fetch and update entity update_entity(resource_type, common_model_id) ``` ## Node.js Example ```javascript const express = require('express'); const crypto = require('crypto'); const { Pool } = require('pg'); const app = express(); const pool = new Pool({ host: process.env.DB_HOST, database: process.env.DB_NAME, user: process.env.DB_USER, password: process.env.DB_PASSWORD }); async function isAlreadyProcessed(eventHash) { const result = await pool.query( 'SELECT 1 FROM processed_webhooks WHERE hash = $1', [eventHash] ); return result.rows.length > 0; } async function markAsProcessed(eventHash) { await pool.query( 'INSERT INTO processed_webhooks (hash, processed_at) VALUES ($1, NOW()) ON CONFLICT (hash) DO NOTHING', [eventHash] ); } app.post('/webhook', express.raw({ type: 'application/json' }), async (req, res) => { const signature = req.headers['x-sportsstack-signature']; const payloadBody = req.body.toString(); const payload = JSON.parse(payloadBody); const eventHash = payload.event.hash; // Verify signature if (!verifySignature(payloadBody, signature, process.env.WEBHOOK_SECRET)) { return res.status(401).send('Invalid signature'); } // Check idempotency if (await isAlreadyProcessed(eventHash)) { return res.status(200).json({ status: 'already_processed' }); } // Process webhook await processWebhook(payload); // Mark as processed await markAsProcessed(eventHash); res.status(200).json({ status: 'ok' }); }); ``` ## Best Practices ### 1. Always Check Before Processing ```python # ✅ Good: Check first if is_already_processed(event_hash): return Response(status=200) process_webhook(payload) mark_as_processed(event_hash) # ❌ Bad: Process then check process_webhook(payload) # May process duplicate! if not is_already_processed(event_hash): mark_as_processed(event_hash) ``` ### 2. Use Atomic Operations ```python # ✅ Good: Atomic insert INSERT INTO processed_webhooks (hash) VALUES ($1) ON CONFLICT (hash) DO NOTHING # ❌ Bad: Check then insert (race condition) if not exists(hash): insert(hash) # Another request might insert between check and insert ``` ### 3. Clean Up Old Hashes ```python # Remove hashes older than 30 days DELETE FROM processed_webhooks WHERE processed_at < NOW() - INTERVAL '30 days' ``` ### 4. Handle Database Errors ```python try: mark_as_processed(conn, event_hash) except psycopg2.IntegrityError: # Hash already exists (another request processed it) pass # This is OK, just means it was already processed ``` ## Testing Idempotency ### Test Duplicate Delivery ```python def test_idempotency(): payload = { "event": { "hash": "test-hash-123", "common_model_id": "test-id" } } # First delivery response1 = handle_webhook(payload) assert response1.status_code == 200 # Duplicate delivery (same hash) response2 = handle_webhook(payload) assert response2.status_code == 200 # Verify only processed once assert get_processing_count("test-hash-123") == 1 ``` ## Related Documentation * [Webhooks Guide](../guides/webhooks-guide.md) - Complete webhook guide * [Setting Up Webhooks in Python](./setting-up-webhooks-python.md) - Python setup * [Webhook Payload Schema](../api-reference/webhooks/webhook-payload-schema.md) - Payload structure