Source code for aioyookassa.contrib.webhook_server

"""
Ready-to-use aiohttp web server for YooKassa webhook notifications.
"""

import logging
from typing import Optional

from aiohttp import web

from aioyookassa.core.webhook_handler import WebhookHandler
from aioyookassa.core.webhook_validator import WebhookIPValidator

logger = logging.getLogger(__name__)

DEFAULT_WEBHOOK_PATH = "/webhook"


[docs] class WebhookServer: """ Ready-to-use aiohttp server for handling YooKassa webhook notifications. Provides a complete HTTP server setup with IP validation and event handling. """ def __init__( self, handler: Optional[WebhookHandler] = None, validator: Optional[WebhookIPValidator] = None, validate_ip: bool = True, logger: Optional[logging.Logger] = None, ): """ Initialize webhook server. :param handler: WebhookHandler instance. If None, creates new one. :param validator: IP validator instance. If None, uses default. :param validate_ip: Whether to validate IP addresses. Default: True. :param logger: Logger instance. If None, uses default logger. """ self.logger = logger if logger is not None else logging.getLogger(__name__) if handler is None: validator = validator if validator is not None else WebhookIPValidator() handler = WebhookHandler(validator=validator, logger=self.logger) self.handler = handler self.validate_ip = validate_ip self.logger.info(f"Initialized WebhookServer: validate_ip={validate_ip}")
[docs] def create_app(self) -> web.Application: """ Create aiohttp application with webhook endpoint. :return: Configured aiohttp Application instance. """ app = web.Application() app.router.add_post(DEFAULT_WEBHOOK_PATH, self._handle_webhook) return app
async def _handle_webhook(self, request: web.Request) -> web.Response: """ Handle incoming webhook request. :param request: aiohttp Request object. :return: HTTP 200 response on success. """ # Get client IP client_ip = request.remote self.logger.info(f"Received webhook request from IP: {client_ip}") # Validate IP if enabled if self.validate_ip: if client_ip is None or not self.handler.validator.is_allowed(client_ip): self.logger.warning( f"Rejected webhook request from unauthorized IP: {client_ip}" ) raise web.HTTPForbidden( text=f"IP address {client_ip} is not in whitelist" ) self.logger.debug(f"IP validation passed: {client_ip}") # Parse request body try: data = await request.json() self.logger.debug( f"Parsed request JSON: event={data.get('event', 'unknown')}" ) except Exception as e: self.logger.error(f"Failed to parse request JSON: {e}") raise web.HTTPBadRequest(text=f"Invalid JSON: {str(e)}") from e # Parse and handle notification try: notification = self.handler.parse_notification(data) event_object = await self.handler.handle_notification(notification) self.logger.info( f"Successfully processed webhook: event={notification.event}, " f"object_type={type(event_object).__name__}" ) except Exception as e: self.logger.error(f"Error processing webhook: {e}", exc_info=True) raise web.HTTPBadRequest(text=f"Error processing webhook: {str(e)}") from e # Return 200 to confirm receipt self.logger.debug("Returning HTTP 200 response") return web.Response(status=200, text="OK")
[docs] def run( self, host: str = "0.0.0.0", port: int = 8080, path: str = DEFAULT_WEBHOOK_PATH, ) -> None: """ Run webhook server. :param host: Host to bind to. Default: 0.0.0.0. :param port: Port to bind to. Default: 8080. :param path: Webhook endpoint path. Default: /webhook. """ app = self.create_app() if path != DEFAULT_WEBHOOK_PATH: # Update route if custom path provided # Create new app with custom path app = web.Application() app.router.add_post(path, self._handle_webhook) self.logger.info(f"Starting webhook server on {host}:{port}{path}") web.run_app(app, host=host, port=port)