Source code for aioyookassa.core.webhook_handler

"""
Webhook event handler for YooKassa notifications.
"""

import asyncio
import logging
import re
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from pydantic import ValidationError

from aioyookassa.core.webhook_validator import WebhookIPValidator
from aioyookassa.exceptions.webhooks import InvalidWebhookDataError
from aioyookassa.types.deals import Deal
from aioyookassa.types.enum import WebhookEvent
from aioyookassa.types.payment import Payment, PaymentMethod
from aioyookassa.types.payout import Payout
from aioyookassa.types.refund import Refund
from aioyookassa.types.webhook_notification import WebhookNotification

logger = logging.getLogger(__name__)


[docs] class WebhookHandler: """ Handler for processing YooKassa webhook notifications. Provides functionality to: - Parse webhook notifications - Validate IP addresses (optional) - Register callbacks for specific events - Automatically parse event objects into typed Pydantic models """ def __init__( self, validator: Optional[WebhookIPValidator] = None, logger: Optional[logging.Logger] = None, ): """ Initialize webhook handler. :param validator: IP validator instance. If None, creates default validator with YooKassa official IP ranges. :param logger: Logger instance. If None, uses default logger. """ self.validator = validator if validator is not None else WebhookIPValidator() self.logger = logger if logger is not None else logging.getLogger(__name__) self.callbacks: Dict[str, Callable] = {} self.pattern_callbacks: List[Tuple[re.Pattern, Callable]] = []
[docs] def register_callback( self, events: Union[WebhookEvent, str, List[Union[WebhookEvent, str]]], callback: Optional[Callable] = None, ) -> Callable: """ Decorator for registering event callbacks. Supports: - Single event: @handler.register_callback(WebhookEvent.PAYMENT_SUCCEEDED) - Multiple events: @handler.register_callback([event1, event2]) - Pattern matching: @handler.register_callback("payment.*") :param events: Event(s) to register callback for. :param callback: Callback function (used when called as decorator). :return: Decorator function or callback. """ def decorator(func: Callable) -> Callable: self._register_events(events, func) return func return decorator
[docs] def add_callback( self, events: Union[WebhookEvent, str, List[Union[WebhookEvent, str]]], callback: Callable, ) -> None: """ Register callback without using decorator syntax. :param events: Event(s) to register callback for. :param callback: Callback function to register. """ self._register_events(events, callback)
def _register_events( self, events: Union[WebhookEvent, str, List[Union[WebhookEvent, str]]], callback: Callable, ) -> None: """Internal method to register events for a callback.""" if isinstance(events, (WebhookEvent, str)): events_list = [events] else: events_list = events for event in events_list: event_str = str(event) if isinstance(event, WebhookEvent) else event if self._is_pattern(event_str): pattern = self._compile_pattern(event_str) self.pattern_callbacks.append((pattern, callback)) self.logger.debug( f"Registered pattern callback: pattern={event_str}, " f"callback={callback.__name__}" ) else: self.callbacks[event_str] = callback self.logger.debug( f"Registered callback: event={event_str}, " f"callback={callback.__name__}" ) def _is_pattern(self, event: str) -> bool: """Check if event string is a pattern (contains wildcards).""" return "*" in event or "?" in event def _compile_pattern(self, pattern: str) -> re.Pattern: """Compile wildcard pattern into regex.""" regex = ( "^" + pattern.replace(".", r"\.").replace("*", ".*").replace("?", ".") + "$" ) return re.compile(regex)
[docs] def parse_notification(self, data: dict) -> WebhookNotification: """ Parse raw webhook data into WebhookNotification model. :param data: Raw JSON data from webhook request. :return: Parsed WebhookNotification instance. :raises InvalidWebhookDataError: If data is invalid. """ try: notification = WebhookNotification(**data) self.logger.debug( f"Parsed webhook notification: event={notification.event}, " f"type={notification.type}" ) return notification except ValidationError as e: self.logger.error(f"Failed to parse webhook notification: {e}") raise InvalidWebhookDataError( f"Invalid webhook notification data: {e}" ) from e
[docs] async def handle_notification( self, notification: WebhookNotification ) -> Union[Payment, Refund, Payout, Deal, PaymentMethod, dict]: """ Process webhook notification and return typed event object. Automatically parses the notification object into the appropriate Pydantic model based on event type and calls registered callbacks. :param notification: Parsed webhook notification. :return: Typed event object (Payment, Refund, Payout, Deal, PaymentMethod). """ event = notification.event self.logger.info(f"Processing webhook notification: event={event}") event_object = self._parse_object(notification) object_type = type(event_object).__name__ self.logger.debug( f"Parsed event object: type={object_type}, " f"id={getattr(event_object, 'id', 'N/A')}" ) # Find and call appropriate callback callback = self._find_callback(event) if callback: self.logger.debug(f"Found callback for event: {event}") try: await self._call_callback(callback, event_object) self.logger.debug(f"Successfully called callback for event: {event}") except Exception as e: self.logger.error( f"Error in callback for event {event}: {e}", exc_info=True ) raise else: self.logger.debug(f"No callback registered for event: {event}") self.logger.info(f"Successfully processed webhook notification: event={event}") return event_object
def _parse_object( self, notification: WebhookNotification ) -> Union[Payment, Refund, Payout, Deal, PaymentMethod, dict]: """Parse notification object into appropriate Pydantic type.""" event = notification.event obj_data = notification.object try: parsed: Union[Payment, Refund, Payout, Deal, PaymentMethod, dict] if event.startswith("payment."): parsed = Payment(**obj_data) self.logger.debug(f"Parsed Payment object: id={parsed.id}") return parsed elif event.startswith("refund."): parsed = Refund(**obj_data) self.logger.debug(f"Parsed Refund object: id={parsed.id}") return parsed elif event.startswith("payout."): parsed = Payout(**obj_data) self.logger.debug(f"Parsed Payout object: id={parsed.id}") return parsed elif event == WebhookEvent.DEAL_CLOSED: parsed = Deal(**obj_data) self.logger.debug(f"Parsed Deal object: id={parsed.id}") return parsed elif event == WebhookEvent.PAYMENT_METHOD_ACTIVE: parsed = PaymentMethod(**obj_data) self.logger.debug(f"Parsed PaymentMethod object: id={parsed.id}") return parsed else: # Unknown event type, return raw dict self.logger.warning(f"Unknown event type: {event}, returning raw dict") return obj_data except (ValidationError, TypeError) as e: # If parsing fails, return raw dict # This allows handling of events with unknown structure self.logger.warning( f"Failed to parse event object for {event}: {e}, " "returning raw dict" ) return obj_data def _find_callback(self, event: str) -> Optional[Callable]: """Find callback for given event (exact match or pattern).""" # Check exact matches first if event in self.callbacks: return self.callbacks[event] # Check patterns for pattern, callback in self.pattern_callbacks: if pattern.match(event): return callback return None async def _call_callback(self, callback: Callable, event_object: Any) -> None: """Call callback function (sync or async).""" if asyncio.iscoroutinefunction(callback): await callback(event_object) else: callback(event_object)