Setting Up Webhooks in Python

Setting Up Webhooks in Python

Step-by-step guide to set up webhooks in Python using Flask.

Prerequisites

  • Python 3.7+
  • Flask
  • Sports Stack API account with webhook access

Step 1: Install Dependencies

pip install flask hmac hashlib

Step 2: Create Flask Application

Create webhook_server.py:

from flask import Flask, request, Response
import hmac
import hashlib
import json
import os

app = Flask(__name__)

# Get shared secret from environment variable
SHARED_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-secret-key')

def verify_signature(payload_body, signature_header, secret):
    """
    Verify webhook signature from Sports Stack.

    Args:
        payload_body: Raw request body (bytes or string)
        signature_header: X-SportsStack-Signature header value
        secret: Your shared secret

    Returns:
        bool: True if signature is valid
    """
    if isinstance(payload_body, str):
        payload_body = payload_body.encode('utf-8')

    expected_signature = hmac.new(
        secret.encode('utf-8'),
        payload_body,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected_signature, signature_header)

@app.route('/webhook', methods=['POST'])
def webhook_handler():
    # Get signature from headers
    signature = request.headers.get('X-SportsStack-Signature')

    # Get raw request body
    payload_body = request.data.decode('utf-8')

    # Verify signature
    if not verify_signature(payload_body, signature, SHARED_SECRET):
        return Response(status=401)

    # Parse payload
    payload = json.loads(payload_body)

    # Process webhook asynchronously (don't block response)
    process_webhook_async(payload)

    return Response(status=200)

def process_webhook_async(payload):
    """
    Process webhook payload asynchronously.
    """
    resource_type = payload['resource_type']
    common_model_id = payload['event']['common_model_id']
    event_hash = payload['event']['hash']

    # Implement idempotency check
    if is_already_processed(event_hash):
        print(f"Webhook already processed: {event_hash}")
        return

    # Fetch updated entity from API
    fetch_and_update_entity(resource_type, common_model_id)

    # Mark as processed
    mark_as_processed(event_hash)

def is_already_processed(event_hash):
    """
    Check if webhook hash has already been processed.
    In production, use a database or Redis.
    """
    # Simple in-memory store for demo
    if not hasattr(is_already_processed, 'processed_hashes'):
        is_already_processed.processed_hashes = set()

    return event_hash in is_already_processed.processed_hashes

def mark_as_processed(event_hash):
    """
    Mark webhook hash as processed.
    """
    if not hasattr(is_already_processed, 'processed_hashes'):
        is_already_processed.processed_hashes = set()

    is_already_processed.processed_hashes.add(event_hash)

def fetch_and_update_entity(resource_type, common_model_id):
    """
    Fetch updated entity from Sports Stack API.
    """
    import requests

    api_key = os.environ.get('SPORTS_STACK_API_KEY')
    base_url = 'https://api.sportsstack.io'

    # Map resource types to API endpoints
    endpoint_map = {
        'event': f'{base_url}/api/v1/events/{common_model_id}',
        'team': f'{base_url}/api/v1/teams/{common_model_id}',
        'player': f'{base_url}/api/v1/players/{common_model_id}',
        'market': f'{base_url}/api/v1/markets/{common_model_id}',
    }

    endpoint = endpoint_map.get(resource_type)
    if not endpoint:
        print(f"Unknown resource type: {resource_type}")
        return

    # Fetch entity
    response = requests.get(
        endpoint,
        headers={
            'Authorization': f'Bearer {api_key}',
            'Accept': 'application/json'
        }
    )

    if response.status_code == 200:
        entity = response.json()
        # Update your database
        update_database(resource_type, entity)
        print(f"Updated {resource_type}: {common_model_id}")
    else:
        print(f"Failed to fetch {resource_type}: {response.status_code}")

def update_database(resource_type, entity):
    """
    Update your database with the entity data.
    """
    # Implement your database update logic here
    print(f"Updating database with {resource_type} data...")
    # Example: db.update_entity(resource_type, entity)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True)

Step 3: Configure Environment Variables

export WEBHOOK_SECRET="your-shared-secret"
export SPORTS_STACK_API_KEY="your-api-key"

Step 4: Run Server

python webhook_server.py

Server runs on http://localhost:8000

Step 5: Expose with ngrok (Local Testing)

# Install ngrok
brew install ngrok

# Expose local server
ngrok http 8000

# Copy HTTPS URL (e.g., https://abc123.ngrok.io)

Step 6: Configure Webhook in CMS

  1. Navigate to Integrations → Destinations
  2. Click "New Destination"
  3. Configure:
    • Name: "Python Webhook Server"
    • Type: Webhook
    • URL: https://abc123.ngrok.io/webhook (or your production URL)
    • Shared Secret: Same as WEBHOOK_SECRET
    • Streams: Select change_log
    • Entity Types: Leave empty for all, or filter specific types

Step 7: Test Webhook

  1. Trigger a data change (Quick Sync, provider replay)
  2. Check server logs for webhook receipt
  3. Verify entity was fetched and updated

Production Considerations

Use a Production WSGI Server

# Install gunicorn
pip install gunicorn

# Run with gunicorn
gunicorn -w 4 -b 0.0.0.0:8000 webhook_server:app

Implement Proper Idempotency

import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def is_already_processed(event_hash):
    return redis_client.exists(f"webhook:{event_hash}")

def mark_as_processed(event_hash):
    redis_client.setex(f"webhook:{event_hash}", 86400, "1")  # 24 hour TTL

Add Logging

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@app.route('/webhook', methods=['POST'])
def webhook_handler():
    logger.info(f"Received webhook: {request.headers.get('X-SportsStack-Signature')}")
    # ... rest of handler

Add Error Handling

@app.route('/webhook', methods=['POST'])
def webhook_handler():
    try:
        # ... webhook processing
        return Response(status=200)
    except Exception as e:
        logger.error(f"Webhook processing failed: {e}")
        return Response(status=500)  # Retry

Next Steps