import asyncio
import logging
import json
import os
from datetime import datetime
from typing import Callable, Dict, List, Any, Union, Optional
from collections import defaultdict
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.errors import KafkaError
from src.events.base import BaseEvent
from src.events.config import KafkaConfig, get_kafka_config


logger = logging.getLogger(__name__)

MAX_RETRIES = 3
RETRY_BACKOFF_SECONDS = [1, 5, 30]


class KafkaEventDispatcher:
    def __init__(self, kafka_config: KafkaConfig):
        self.kafka_config = kafka_config
        self._listeners: Dict[str, List[Callable]] = defaultdict(list)
        self._async_listeners: Dict[str, List[Callable]] = defaultdict(list)

        # Kafka components - created lazily in initialize() to avoid async context issues
        self._producer: Optional[AIOKafkaProducer] = None
        self._consumers: Dict[str, AIOKafkaConsumer] = {}
        self._consumer_tasks: Dict[str, asyncio.Task] = {}
        self._shutdown_event: Optional[asyncio.Event] = None
        self._initialized = False
        self._kafka_enabled = os.getenv("KAFKA_ENABLED", "true").lower() != "false"

    async def initialize(self):
        """Initialize Kafka producer and consumers"""
        if self._initialized:
            return

        if not self._kafka_enabled:
            logger.info("Kafka disabled (KAFKA_ENABLED=false) — events will be processed locally")
            self._initialized = True
            return

        try:
            # Create producer and shutdown event in async context
            self._producer = AIOKafkaProducer(
                bootstrap_servers=self.kafka_config.bootstrap_servers,
                value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                key_serializer=lambda k: k.encode('utf-8') if k else None,
                **self._get_producer_config()
            )
            self._shutdown_event = asyncio.Event()

            await self._producer.start()

            # Start consumers for each event type that has listeners
            for event_type in self._get_subscribed_event_types():
                await self._start_consumer_for_event_type(event_type)

            self._initialized = True
            logger.info("Kafka event dispatcher initialized successfully")

        except Exception as e:
            logger.error(f"Failed to initialize Kafka event dispatcher: {e}")
            logger.warning("Kafka unavailable — dispatcher will process events locally as fallback")
            # Ensure producer is None so dispatch() falls back to local processing
            self._producer = None
            
    async def shutdown(self):
        """Shutdown Kafka connections and consumers"""
        logger.info("Shutting down Kafka event dispatcher...")
        
        # Signal all consumer tasks to stop
        if self._shutdown_event:
            self._shutdown_event.set()
        
        # Cancel and wait for consumer tasks to finish
        for task in self._consumer_tasks.values():
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass
            
        # Stop consumers
        for consumer in self._consumers.values():
            await consumer.stop()
            
        # Stop producer
        if self._producer:
            await self._producer.stop()
            
        self._initialized = False
        logger.info("Kafka event dispatcher shutdown complete")
        
    def register(self, event_type: str, listener: Callable[[BaseEvent], None]):
        """Register a synchronous event listener"""
        self._listeners[event_type].append(listener)
        logger.info(f"Registered sync listener for {event_type}")
        
        # Start consumer for this event type if dispatcher is initialized
        if self._initialized and event_type not in self._consumers:
            asyncio.create_task(self._start_consumer_for_event_type(event_type))
        
    def register_async(self, event_type: str, listener: Callable[[BaseEvent], None]):
        """Register an asynchronous event listener"""
        self._async_listeners[event_type].append(listener)
        logger.info(f"Registered async listener for {event_type}")
        
        # Start consumer for this event type if dispatcher is initialized
        if self._initialized and event_type not in self._consumers:
            asyncio.create_task(self._start_consumer_for_event_type(event_type))
        
    async def dispatch(self, event: BaseEvent):
        """Dispatch event to Kafka topic"""
        if not self._producer:
            if not self._kafka_enabled:
                logger.info(
                    f"[LOCAL] Event processed locally (Kafka disabled): "
                    f"{event.event_type} | id={event.event_id} | data={event.data}"
                )
            else:
                logger.warning(f"Kafka producer not initialized, processing locally: {event.event_type}")
            await self._process_event_locally(event)
            return

        try:
            topic_name = self._get_topic_name(event.event_type)
            event_data = event.to_kafka_message()

            # Send to Kafka
            await self._producer.send_and_wait(
                topic_name,
                value=event_data,
                key=event.event_id
            )

            logger.info(f"Event dispatched to Kafka: {event.event_type} - {event.event_id}")
            
        except KafkaError as e:
            logger.error(f"Failed to dispatch event to Kafka: {e}")
            # Fallback to local processing
            await self._process_event_locally(event)
        except Exception as e:
            logger.error(f"Unexpected error dispatching event: {e}")
            await self._process_event_locally(event)
            
    async def dispatch_async(self, event: BaseEvent):
        """Dispatch event asynchronously"""
        await self.dispatch(event)
        
    async def _process_event_locally(self, event: BaseEvent):
        """Fallback to process event locally if Kafka is unavailable"""
        logger.info(f"Processing event locally as fallback: {event.event_type}")
        
        # Collect all listener tasks (both sync and async)
        async_tasks = []
        
        # Run synchronous listeners in executor to avoid blocking
        for listener in self._listeners.get(event.event_type, []):
            async_tasks.append(self._call_sync_listener_async(listener, event))
                
        # Add asynchronous listeners
        for listener in self._async_listeners.get(event.event_type, []):
            async_tasks.append(self._call_async_listener(listener, event))
            
        if async_tasks:
            await asyncio.gather(*async_tasks, return_exceptions=True)
            
    async def _start_consumer_for_event_type(self, event_type: str):
        """Start a Kafka consumer for a specific event type"""
        if event_type in self._consumers:
            return
            
        topic_name = self._get_topic_name(event_type)
        consumer_group = f"{self.kafka_config.consumer_group_prefix}_{event_type}"
        
        try:
            consumer = AIOKafkaConsumer(
                topic_name,
                bootstrap_servers=self.kafka_config.bootstrap_servers,
                group_id=consumer_group,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                **self._get_consumer_config()
            )
            
            await consumer.start()
            self._consumers[event_type] = consumer
            
            # Start consumer task
            consumer_task = asyncio.create_task(
                self._consume_events(event_type, consumer)
            )
            self._consumer_tasks[event_type] = consumer_task
            
            logger.info(f"Started Kafka consumer for {event_type} on topic {topic_name}")
            
        except Exception as e:
            logger.error(f"Failed to start consumer for {event_type}: {e}")
            
    async def _consume_events(self, event_type: str, consumer: AIOKafkaConsumer):
        """Consumer task function to process events from Kafka"""
        logger.info(f"Consumer task started for {event_type}")
        
        try:
            async for message in consumer:
                if self._shutdown_event.is_set():
                    break
                    
                try:
                    event_data = message.value
                    event = BaseEvent.from_kafka_message(event_data)
                    await self._process_consumed_event(event)
                    
                except Exception as e:
                    logger.error(f"Error processing message from {event_type}: {e}")
                    
        except asyncio.CancelledError:
            logger.info(f"Consumer task cancelled for {event_type}")
        except Exception as e:
            logger.error(f"Error in consumer task for {event_type}: {e}")
        finally:
            logger.info(f"Consumer task stopped for {event_type}")
            
    async def _process_consumed_event(self, event: BaseEvent):
        """Process an event received from Kafka"""
        logger.info(f"Processing consumed event: {event.event_type} - {event.event_id}")
        
        # Collect all listener tasks (both sync and async)
        async_tasks = []
        
        # Run synchronous listeners in executor to avoid blocking
        for listener in self._listeners.get(event.event_type, []):
            async_tasks.append(self._call_sync_listener_async(listener, event))
                
        # Add asynchronous listeners
        for listener in self._async_listeners.get(event.event_type, []):
            async_tasks.append(self._call_async_listener(listener, event))
            
        if async_tasks:
            await asyncio.gather(*async_tasks, return_exceptions=True)
                
    async def _call_sync_listener_async(self, listener: Callable, event: BaseEvent):
        """Call a synchronous listener in an executor to avoid blocking"""
        try:
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(None, listener, event)
        except Exception as e:
            logger.error(f"Error in sync listener {listener.__name__}: {e}")

    async def _call_async_listener(self, listener: Callable, event: BaseEvent):
        """Call an async listener with retry + DLQ on failure"""
        await _execute_listener_with_retry(listener, event)
            
    def _get_producer_config(self) -> Dict[str, Any]:
        """Get producer configuration for aiokafka"""
        config = self.kafka_config.producer_config.copy()
        # Convert kafka-python config to aiokafka config
        if 'acks' in config and config['acks'] == 'all':
            config['acks'] = -1
        return config
        
    def _get_consumer_config(self) -> Dict[str, Any]:
        """Get consumer configuration for aiokafka"""
        config = self.kafka_config.consumer_config.copy()
        # Convert kafka-python config to aiokafka config
        # Most config keys are compatible, just return as-is
        return config
        
    def _get_topic_name(self, event_type: str) -> str:
        """Get Kafka topic name for event type"""
        return f"{self.kafka_config.topic_prefix}{event_type}"
        
    def _get_subscribed_event_types(self) -> List[str]:
        """Get list of event types that have listeners"""
        subscribed_types = set()
        subscribed_types.update(self._listeners.keys())
        subscribed_types.update(self._async_listeners.keys())
        return list(subscribed_types)
        
    def get_event_history(self, limit: int = 100) -> List[BaseEvent]:
        """Get recent event history"""
        # Event history not implemented for Kafka-based dispatcher
        return []
        
    def clear_history(self):
        """Clear event history"""
        # Event history not implemented for Kafka-based dispatcher
        return None
        
    def get_listener_count(self, event_type: str) -> Dict[str, int]:
        """Get count of listeners for an event type"""
        return {
            "sync_listeners": len(self._listeners.get(event_type, [])),
            "async_listeners": len(self._async_listeners.get(event_type, []))
        }


async def _execute_listener_with_retry(listener_fn: Callable, event: BaseEvent) -> None:
    """
    Execute an async listener, retrying up to MAX_RETRIES times with
    exponential backoff.  On exhaustion, sends the event to the DLQ topic.
    """
    for attempt in range(MAX_RETRIES):
        try:
            if asyncio.iscoroutinefunction(listener_fn):
                await listener_fn(event)
            else:
                loop = asyncio.get_event_loop()
                await loop.run_in_executor(None, listener_fn, event)
            return
        except Exception as exc:
            logger.warning(
                "Listener %s failed on attempt %d/%d: %s",
                listener_fn.__name__,
                attempt + 1,
                MAX_RETRIES,
                exc,
            )
            if attempt == MAX_RETRIES - 1:
                await _send_to_dlq(event, exc, listener_fn.__name__)
                return
            await asyncio.sleep(RETRY_BACKOFF_SECONDS[attempt])


SENSITIVE_EVENT_FIELDS = {
    "iframe_token", "to_email", "to_phone", "customer_name",
    "verify_link", "retry_link",
}


def _sanitize_event_data(data: dict) -> dict:
    """Strip PII / sensitive fields before storing in DLQ payload."""
    return {k: v for k, v in data.items() if k not in SENSITIVE_EVENT_FIELDS}


async def _send_to_dlq(event: BaseEvent, exc: Exception, handler_name: str) -> None:
    """
    Build a DLQ event and attempt to dispatch it via the EventDispatcher.
    Failures here are logged only — we must not raise.
    """
    dlq_event = BaseEvent(
        event_type=f"{event.event_type}.dlq",
        data={
            "original_event_id": str(event.event_id),
            "original_event_type": event.event_type,
            "original_data": _sanitize_event_data(event.data),
            "failed_at": datetime.utcnow().isoformat(),
            "failure_reason": str(exc),
            "handler_name": handler_name,
        },
        correlation_id=event.correlation_id,
        source="dlq",
    )
    logger.error(
        "Event %s (type=%s) sent to DLQ after %d retries. Handler: %s. Reason: %s",
        event.event_id,
        event.event_type,
        MAX_RETRIES,
        handler_name,
        exc,
    )
    try:
        await EventDispatcher.dispatch(dlq_event)
    except Exception as dlq_exc:
        logger.error("Failed to dispatch DLQ event: %s", dlq_exc)


def initialize_event_dispatcher(kafka_config: KafkaConfig) -> KafkaEventDispatcher:
    """Initialize the global event dispatcher with Kafka configuration"""
    event_dispatcher = KafkaEventDispatcher(kafka_config)
    return event_dispatcher


# Global event dispatcher instance (will be initialized with Kafka config)
kafka_config = get_kafka_config()
EventDispatcher: KafkaEventDispatcher = initialize_event_dispatcher(kafka_config)


def on_event(event_type: str):
    """Decorator to register event listeners"""
    def decorator(func):
        EventDispatcher.register(event_type, func)
        return func
    return decorator


def on_event_async(event_type: str):
    """Decorator to register async event listeners"""
    def decorator(func):
        EventDispatcher.register_async(event_type, func)
        return func
    return decorator