Source code for aioyookassa.core.webhook_validator

"""
IP address validator for YooKassa webhook notifications.
"""

import ipaddress
import logging
from typing import List, Optional, Set, Union

logger = logging.getLogger(__name__)


[docs] class WebhookIPValidator: """ Validator for YooKassa webhook IP addresses. Validates that incoming webhook requests come from authorized YooKassa IP ranges. Supports both IPv4 and IPv6 addresses, CIDR ranges, and individual IPs. """ # Official YooKassa IP ranges DEFAULT_ALLOWED_IPS: List[str] = [ "185.71.76.0/27", "185.71.77.0/27", "77.75.153.0/25", "77.75.156.11", "77.75.156.35", "77.75.154.128/25", "2a02:5180::/32", ] def __init__( self, allowed_ips: Optional[List[str]] = None, logger: Optional[logging.Logger] = None, ): """ Initialize IP validator. :param allowed_ips: List of allowed IP addresses or CIDR ranges. If None, uses default YooKassa IP ranges. :param logger: Logger instance. If None, uses default logger. """ self.logger = logger if logger is not None else logging.getLogger(__name__) self._allowed_networks: Set[ Union[ipaddress.IPv4Network, ipaddress.IPv6Network] ] = set() self._allowed_ips: Set[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]] = ( set() ) ips_to_use = ( allowed_ips if allowed_ips is not None else self.DEFAULT_ALLOWED_IPS ) self._parse_allowed_ips(ips_to_use) self.logger.debug( f"Initialized IP validator with {len(self._allowed_networks)} networks " f"and {len(self._allowed_ips)} individual IPs" ) def _parse_allowed_ips(self, allowed_ips: List[str]) -> None: """Parse and store allowed IP addresses and networks.""" for ip_str in allowed_ips: try: # Try to parse as network (CIDR) if "/" in ip_str: network = ipaddress.ip_network(ip_str, strict=False) self._allowed_networks.add(network) else: # Parse as individual IP ip_addr = ipaddress.ip_address(ip_str) self._allowed_ips.add(ip_addr) except ValueError: # Invalid IP format, skip it continue
[docs] def is_allowed(self, ip: str) -> bool: """ Check if IP address is allowed. :param ip: IP address to check (IPv4 or IPv6). :return: True if IP is allowed, False otherwise. """ try: ip_addr = ipaddress.ip_address(ip) except ValueError: # Invalid IP format self.logger.warning(f"Invalid IP address format: {ip}") return False # Check individual IPs first if ip_addr in self._allowed_ips: self.logger.debug(f"IP {ip} is allowed (individual IP match)") return True # Check networks for network in self._allowed_networks: if ip_addr in network: self.logger.debug(f"IP {ip} is allowed (network {network} match)") return True self.logger.warning(f"IP {ip} is not in whitelist") return False