Setting Up Webhooks in Python
Setting Up Webhooks in Python
Step-by-step guide to set up webhooks in Python using Flask.
Prerequisites
- Python 3.7+
- Flask
- Sports Stack API account with webhook access
Step 1: Install Dependencies
pip install flask hmac hashlibStep 2: Create Flask Application
Create webhook_server.py:
from flask import Flask, request, Response
import hmac
import hashlib
import json
import os
app = Flask(__name__)
# Get shared secret from environment variable
SHARED_SECRET = os.environ.get('WEBHOOK_SECRET', 'your-secret-key')
def verify_signature(payload_body, signature_header, secret):
"""
Verify webhook signature from Sports Stack.
Args:
payload_body: Raw request body (bytes or string)
signature_header: X-SportsStack-Signature header value
secret: Your shared secret
Returns:
bool: True if signature is valid
"""
if isinstance(payload_body, str):
payload_body = payload_body.encode('utf-8')
expected_signature = hmac.new(
secret.encode('utf-8'),
payload_body,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected_signature, signature_header)
@app.route('/webhook', methods=['POST'])
def webhook_handler():
# Get signature from headers
signature = request.headers.get('X-SportsStack-Signature')
# Get raw request body
payload_body = request.data.decode('utf-8')
# Verify signature
if not verify_signature(payload_body, signature, SHARED_SECRET):
return Response(status=401)
# Parse payload
payload = json.loads(payload_body)
# Process webhook asynchronously (don't block response)
process_webhook_async(payload)
return Response(status=200)
def process_webhook_async(payload):
"""
Process webhook payload asynchronously.
"""
resource_type = payload['resource_type']
common_model_id = payload['event']['common_model_id']
event_hash = payload['event']['hash']
# Implement idempotency check
if is_already_processed(event_hash):
print(f"Webhook already processed: {event_hash}")
return
# Fetch updated entity from API
fetch_and_update_entity(resource_type, common_model_id)
# Mark as processed
mark_as_processed(event_hash)
def is_already_processed(event_hash):
"""
Check if webhook hash has already been processed.
In production, use a database or Redis.
"""
# Simple in-memory store for demo
if not hasattr(is_already_processed, 'processed_hashes'):
is_already_processed.processed_hashes = set()
return event_hash in is_already_processed.processed_hashes
def mark_as_processed(event_hash):
"""
Mark webhook hash as processed.
"""
if not hasattr(is_already_processed, 'processed_hashes'):
is_already_processed.processed_hashes = set()
is_already_processed.processed_hashes.add(event_hash)
def fetch_and_update_entity(resource_type, common_model_id):
"""
Fetch updated entity from Sports Stack API.
"""
import requests
api_key = os.environ.get('SPORTS_STACK_API_KEY')
base_url = 'https://api.sportsstack.io'
# Map resource types to API endpoints
endpoint_map = {
'event': f'{base_url}/api/v1/events/{common_model_id}',
'team': f'{base_url}/api/v1/teams/{common_model_id}',
'player': f'{base_url}/api/v1/players/{common_model_id}',
'market': f'{base_url}/api/v1/markets/{common_model_id}',
}
endpoint = endpoint_map.get(resource_type)
if not endpoint:
print(f"Unknown resource type: {resource_type}")
return
# Fetch entity
response = requests.get(
endpoint,
headers={
'Authorization': f'Bearer {api_key}',
'Accept': 'application/json'
}
)
if response.status_code == 200:
entity = response.json()
# Update your database
update_database(resource_type, entity)
print(f"Updated {resource_type}: {common_model_id}")
else:
print(f"Failed to fetch {resource_type}: {response.status_code}")
def update_database(resource_type, entity):
"""
Update your database with the entity data.
"""
# Implement your database update logic here
print(f"Updating database with {resource_type} data...")
# Example: db.update_entity(resource_type, entity)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000, debug=True)Step 3: Configure Environment Variables
export WEBHOOK_SECRET="your-shared-secret"
export SPORTS_STACK_API_KEY="your-api-key"Step 4: Run Server
python webhook_server.pyServer runs on http://localhost:8000
Step 5: Expose with ngrok (Local Testing)
# Install ngrok
brew install ngrok
# Expose local server
ngrok http 8000
# Copy HTTPS URL (e.g., https://abc123.ngrok.io)Step 6: Configure Webhook in CMS
- Navigate to Integrations → Destinations
- Click "New Destination"
- Configure:
- Name: "Python Webhook Server"
- Type: Webhook
- URL:
https://abc123.ngrok.io/webhook(or your production URL) - Shared Secret: Same as
WEBHOOK_SECRET - Streams: Select
change_log - Entity Types: Leave empty for all, or filter specific types
Step 7: Test Webhook
- Trigger a data change (Quick Sync, provider replay)
- Check server logs for webhook receipt
- Verify entity was fetched and updated
Production Considerations
Use a Production WSGI Server
# Install gunicorn
pip install gunicorn
# Run with gunicorn
gunicorn -w 4 -b 0.0.0.0:8000 webhook_server:appImplement Proper Idempotency
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def is_already_processed(event_hash):
return redis_client.exists(f"webhook:{event_hash}")
def mark_as_processed(event_hash):
redis_client.setex(f"webhook:{event_hash}", 86400, "1") # 24 hour TTLAdd Logging
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.route('/webhook', methods=['POST'])
def webhook_handler():
logger.info(f"Received webhook: {request.headers.get('X-SportsStack-Signature')}")
# ... rest of handlerAdd Error Handling
@app.route('/webhook', methods=['POST'])
def webhook_handler():
try:
# ... webhook processing
return Response(status=200)
except Exception as e:
logger.error(f"Webhook processing failed: {e}")
return Response(status=500) # RetryNext Steps
- Webhook Idempotency Recipe - Implement proper idempotency
- Webhooks Guide - Complete webhook documentation
- Webhook Verification - Signature verification details
Updated 29 days ago
