Implementing Webhook Idempotency

Implementing Webhook Idempotency

Learn how to implement idempotency for webhooks to prevent duplicate processing.

Why Idempotency Matters

Sports Stack webhooks use at-least-once delivery semantics, meaning:

  • Webhooks may be delivered multiple times
  • Network retries can cause duplicates
  • You must handle duplicates gracefully

The Solution: Use the Hash Field

Every webhook payload includes a hash field that uniquely identifies the change:

{
  "event": {
    "hash": "a1b2c3d4e5f6789012345678901234567890abcdef1234567890abcdef123456",
    "common_model_id": "550e8400-e29b-41d4-a716-446655440000"
  }
}

Use this hash to detect and skip duplicate webhooks.

Implementation Patterns

Pattern 1: In-Memory Set (Development)

Use Case: Development and testing

processed_hashes = set()

def handle_webhook(payload):
    event_hash = payload['event']['hash']

    if event_hash in processed_hashes:
        return Response(status=200)  # Already processed

    # Process webhook
    process_entity_update(payload)

    # Mark as processed
    processed_hashes.add(event_hash)

    return Response(status=200)

Limitations:

  • Lost on server restart
  • Not shared across instances
  • Memory usage grows over time

Pattern 2: Database Table (Production)

Use Case: Production applications

import sqlite3
from datetime import datetime, timedelta

def setup_database():
    conn = sqlite3.connect('webhooks.db')
    conn.execute('''
        CREATE TABLE IF NOT EXISTS processed_webhooks (
            hash TEXT PRIMARY KEY,
            processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.commit()
    return conn

def is_already_processed(conn, event_hash):
    cursor = conn.execute(
        'SELECT 1 FROM processed_webhooks WHERE hash = ?',
        (event_hash,)
    )
    return cursor.fetchone() is not None

def mark_as_processed(conn, event_hash):
    conn.execute(
        'INSERT OR IGNORE INTO processed_webhooks (hash) VALUES (?)',
        (event_hash,)
    )
    conn.commit()

def cleanup_old_hashes(conn, days=30):
    """Remove hashes older than N days"""
    cutoff = datetime.now() - timedelta(days=days)
    conn.execute(
        'DELETE FROM processed_webhooks WHERE processed_at < ?',
        (cutoff,)
    )
    conn.commit()

Pattern 3: Redis (Scalable)

Use Case: High-volume applications, multiple instances

import redis
from datetime import timedelta

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def is_already_processed(event_hash):
    return redis_client.exists(f"webhook:{event_hash}")

def mark_as_processed(event_hash, ttl_days=30):
    ttl_seconds = timedelta(days=ttl_days).total_seconds()
    redis_client.setex(
        f"webhook:{event_hash}",
        int(ttl_seconds),
        "1"
    )

Benefits:

  • Fast lookups
  • Automatic expiration
  • Shared across instances
  • Memory efficient

Pattern 4: PostgreSQL (Recommended)

Use Case: Production applications with PostgreSQL

import psycopg2
from datetime import datetime, timedelta

def is_already_processed(conn, event_hash):
    cursor = conn.cursor()
    cursor.execute(
        'SELECT 1 FROM processed_webhooks WHERE hash = %s',
        (event_hash,)
    )
    return cursor.fetchone() is not None

def mark_as_processed(conn, event_hash):
    cursor = conn.cursor()
    cursor.execute(
        '''
        INSERT INTO processed_webhooks (hash, processed_at)
        VALUES (%s, %s)
        ON CONFLICT (hash) DO NOTHING
        ''',
        (event_hash, datetime.now())
    )
    conn.commit()

# Database schema
CREATE TABLE processed_webhooks (
    hash VARCHAR(64) PRIMARY KEY,
    processed_at TIMESTAMP DEFAULT NOW(),
    resource_type VARCHAR(50),
    common_model_id UUID
);

CREATE INDEX idx_processed_webhooks_processed_at
ON processed_webhooks(processed_at);

Complete Example: Flask with PostgreSQL

from flask import Flask, request, Response
import psycopg2
import hmac
import hashlib
import json
import os

app = Flask(__name__)

SHARED_SECRET = os.environ.get('WEBHOOK_SECRET')

def get_db_connection():
    return psycopg2.connect(
        host=os.environ.get('DB_HOST', 'localhost'),
        database=os.environ.get('DB_NAME', 'webhooks'),
        user=os.environ.get('DB_USER', 'postgres'),
        password=os.environ.get('DB_PASSWORD')
    )

def verify_signature(payload_body, signature_header, secret):
    if isinstance(payload_body, str):
        payload_body = payload_body.encode('utf-8')

    expected_signature = hmac.new(
        secret.encode('utf-8'),
        payload_body,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected_signature, signature_header)

@app.route('/webhook', methods=['POST'])
def webhook_handler():
    # Verify signature
    signature = request.headers.get('X-SportsStack-Signature')
    payload_body = request.data.decode('utf-8')

    if not verify_signature(payload_body, signature, SHARED_SECRET):
        return Response(status=401)

    # Parse payload
    payload = json.loads(payload_body)
    event_hash = payload['event']['hash']

    # Check idempotency
    conn = get_db_connection()
    try:
        if is_already_processed(conn, event_hash):
            return Response(status=200)  # Already processed

        # Process webhook
        process_webhook(payload)

        # Mark as processed
        mark_as_processed(conn, event_hash)

        return Response(status=200)
    finally:
        conn.close()

def is_already_processed(conn, event_hash):
    cursor = conn.cursor()
    cursor.execute(
        'SELECT 1 FROM processed_webhooks WHERE hash = %s',
        (event_hash,)
    )
    return cursor.fetchone() is not None

def mark_as_processed(conn, event_hash):
    cursor = conn.cursor()
    cursor.execute(
        '''
        INSERT INTO processed_webhooks (hash, processed_at)
        VALUES (%s, NOW())
        ON CONFLICT (hash) DO NOTHING
        ''',
        (event_hash,)
    )
    conn.commit()

def process_webhook(payload):
    # Your webhook processing logic
    resource_type = payload['resource_type']
    common_model_id = payload['event']['common_model_id']

    # Fetch and update entity
    update_entity(resource_type, common_model_id)

Node.js Example

const express = require('express');
const crypto = require('crypto');
const { Pool } = require('pg');

const app = express();
const pool = new Pool({
  host: process.env.DB_HOST,
  database: process.env.DB_NAME,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD
});

async function isAlreadyProcessed(eventHash) {
  const result = await pool.query(
    'SELECT 1 FROM processed_webhooks WHERE hash = $1',
    [eventHash]
  );
  return result.rows.length > 0;
}

async function markAsProcessed(eventHash) {
  await pool.query(
    'INSERT INTO processed_webhooks (hash, processed_at) VALUES ($1, NOW()) ON CONFLICT (hash) DO NOTHING',
    [eventHash]
  );
}

app.post('/webhook', express.raw({ type: 'application/json' }), async (req, res) => {
  const signature = req.headers['x-sportsstack-signature'];
  const payloadBody = req.body.toString();
  const payload = JSON.parse(payloadBody);
  const eventHash = payload.event.hash;

  // Verify signature
  if (!verifySignature(payloadBody, signature, process.env.WEBHOOK_SECRET)) {
    return res.status(401).send('Invalid signature');
  }

  // Check idempotency
  if (await isAlreadyProcessed(eventHash)) {
    return res.status(200).json({ status: 'already_processed' });
  }

  // Process webhook
  await processWebhook(payload);

  // Mark as processed
  await markAsProcessed(eventHash);

  res.status(200).json({ status: 'ok' });
});

Best Practices

1. Always Check Before Processing

# ✅ Good: Check first
if is_already_processed(event_hash):
    return Response(status=200)

process_webhook(payload)
mark_as_processed(event_hash)

# ❌ Bad: Process then check
process_webhook(payload)  # May process duplicate!
if not is_already_processed(event_hash):
    mark_as_processed(event_hash)

2. Use Atomic Operations

# ✅ Good: Atomic insert
INSERT INTO processed_webhooks (hash)
VALUES ($1)
ON CONFLICT (hash) DO NOTHING

# ❌ Bad: Check then insert (race condition)
if not exists(hash):
    insert(hash)  # Another request might insert between check and insert

3. Clean Up Old Hashes

# Remove hashes older than 30 days
DELETE FROM processed_webhooks
WHERE processed_at < NOW() - INTERVAL '30 days'

4. Handle Database Errors

try:
    mark_as_processed(conn, event_hash)
except psycopg2.IntegrityError:
    # Hash already exists (another request processed it)
    pass  # This is OK, just means it was already processed

Testing Idempotency

Test Duplicate Delivery

def test_idempotency():
    payload = {
        "event": {
            "hash": "test-hash-123",
            "common_model_id": "test-id"
        }
    }

    # First delivery
    response1 = handle_webhook(payload)
    assert response1.status_code == 200

    # Duplicate delivery (same hash)
    response2 = handle_webhook(payload)
    assert response2.status_code == 200

    # Verify only processed once
    assert get_processing_count("test-hash-123") == 1

Related Documentation