"""
Authorizations Services — business logic for the authorizations report module (PRD-009-HWRPT).

Responsibilities:
  - list_authorizations    — paginated list with filters
  - get_authorization_detail — full detail including proof, history events
  - get_summary            — top-N customer summaries
  - export_csv             — streaming CSV export
  - expire_authorization   — lifecycle transition to EXPIRED
  - request_payment_method_update — dispatch update-request notifications
"""
import csv
import io
import logging
import math
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session

from src.apps.authorizations import crud
from src.apps.authorizations.schemas.authorization_detail import (
    AssociatedInvoiceSchema,
    AssociatedTransactionSchema,
    AuthorizationAssociationsResponse,
    AuthorizationCustomerSchema,
    AuthorizationDetailResponse,
    AuthorizationHistoryEvent,
    AuthorizationHistoryResponse,
    AuthorizationHppSessionSchema,
    AuthorizationInvoicesResponse,
    AuthorizationPayerSchema,
    AuthorizationPaymentRequestSchema,
    AuthorizationProofResponse,
    AuthorizationProofSchema,
    AuthorizationTransactionsResponse,
    ExpireAuthorizationResponse,
    RequestPaymentUpdateResponse,
)
from src.apps.authorizations.schemas.authorization_list import (
    AuthorizationListItem,
    AuthorizationListResponse,
    AuthorizationSummaryResponse,
)
from src.apps.payment_requests.models.payment_request_authorizations import (
    PaymentRequestAuthorizations,
)
from src.core.exceptions import ConflictError, NotFoundError

logger = logging.getLogger(__name__)

# Terminal statuses that cannot be transitioned further.
_TERMINAL_STATUSES = {"EXPIRED", "SUPERSEDED"}

# How long (hours) a payment-method update token is valid.
_PM_UPDATE_TOKEN_TTL_HOURS = 48


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------

def _validate_contact_ids_belong_to_customer(
    db: Session,
    contact_ids: List[int],
    customer_id: int,
    merchant_id: int,
) -> None:
    """
    Raise APIException(422) if any contact_id in the list does not belong to
    the given customer within the given merchant scope.

    This prevents cross-customer contact injection: a caller must not be able
    to dispatch payment-update notifications to contacts they do not own simply
    by guessing or enumerating contact IDs.
    """
    if not contact_ids:
        return
    from src.apps.customers.models.customer_contact import CustomerContact
    from sqlalchemy import select as sa_select, func

    stmt = (
        sa_select(func.count())
        .select_from(CustomerContact)
        .where(
            CustomerContact.id.in_(contact_ids),
            CustomerContact.customer_id == customer_id,
            CustomerContact.deleted_at.is_(None),
        )
    )
    matched = db.execute(stmt).scalar_one()
    if matched != len(set(contact_ids)):
        from src.core.exceptions import APIException
        raise APIException(
            message=(
                "One or more contact_ids do not belong to this authorization's customer. "
                "Only contacts associated with the customer on this authorization may be notified."
            ),
            status_code=422,
        )


def _get_presigned_signature_url(
    signature_file,
    db: Session,
) -> Optional[str]:
    """Return a presigned S3 URL (or fallback URL) for a signature file."""
    if signature_file is None:
        return None
    try:
        from src.apps.files.helpers.presigned import generate_presigned_file_url
        return generate_presigned_file_url(signature_file, db=db)
    except Exception as exc:
        logger.warning("Failed to generate presigned URL for signature file: %s", exc)
        return getattr(signature_file, "full_url", None)


def _safe_decrypt_otp(encrypted_value: Optional[str]) -> Optional[str]:
    """
    Attempt to decrypt an authorization_value for SMS type.
    Returns the plaintext OTP value on success, or None on any failure.

    SECURITY: Never fall back to returning the raw (potentially ciphertext or
    plaintext) value — doing so would expose encrypted blobs or legacy plaintext
    OTP data directly to callers.  Callers must treat None as "value unavailable"
    and display an appropriate UI message rather than the raw stored string.
    """
    if not encrypted_value:
        return None
    try:
        from src.core.utils.crypto import decrypt_otp
        return decrypt_otp(encrypted_value)
    except Exception as exc:
        logger.warning(
            "OTP decryption failed for authorization_value (key misconfigured or "
            "legacy unencrypted record): %s",
            type(exc).__name__,
        )
        # Return None — do NOT return the raw value; callers handle None gracefully.
        return None


def _build_proof(
    auth: PaymentRequestAuthorizations,
    db: Session,
) -> AuthorizationProofSchema:
    """Build the proof-of-authorization section for the detail view."""
    auth_type = (auth.authorization_type or "").upper()

    signature_url: Optional[str] = None
    authorization_value: Optional[str] = None

    if auth_type == "SIGN":
        signature_url = _get_presigned_signature_url(auth.signature_file, db)
    elif auth_type == "SMS":
        authorization_value = _safe_decrypt_otp(auth.authorization_value)

    return AuthorizationProofSchema(
        authorization_type=auth.authorization_type,
        signing_name=auth.signing_name,
        authorization_literal=auth.authorization_literal,
        signature_url=signature_url,
        authorization_value=authorization_value,
        verified_phone_last4=auth.verified_phone_last4,
        ip_address=auth.ip_address,
    )


def _build_history_events(
    auth: PaymentRequestAuthorizations,
) -> List[AuthorizationHistoryEvent]:
    """
    Derive a chronological list of lifecycle events from the authorization record.
    """
    events: List[Dict[str, Any]] = []

    # 1. Authorization record created
    events.append({"event": "Authorization Created", "occurred_at": auth.created_at})

    # 2. Customer viewed the payment page (HPP session initiated)
    if auth.hpp_session and auth.hpp_session.initiated_at:
        events.append(
            {
                "event": "Customer Viewed Payment Page",
                "occurred_at": auth.hpp_session.initiated_at,
            }
        )

    # 3. Customer authorised
    if auth.is_verified and auth.authorization_date:
        events.append(
            {
                "event": "Customer Authorized",
                "occurred_at": auth.authorization_date,
            }
        )

    # 4. Payment method update requested
    metadata = auth.additional_info or {}
    if isinstance(metadata, dict):
        pm_update_requested_at = metadata.get("payment_method_update_requested_at")
        if pm_update_requested_at:
            try:
                ts = datetime.fromisoformat(pm_update_requested_at)
                events.append(
                    {
                        "event": "Payment Method Update Requested",
                        "occurred_at": ts,
                    }
                )
            except (ValueError, TypeError):
                pass

    # 5. Expiry
    if auth.expired_at:
        events.append({"event": "Authorization Expired", "occurred_at": auth.expired_at})

    # 6. Superseded
    if (auth.status or "").upper() == "SUPERSEDED":
        superseded_at = auth.expired_at or auth.updated_at or auth.created_at
        events.append({"event": "Authorization Superseded", "occurred_at": superseded_at})

    # Sort ascending by timestamp.
    # Normalise to naive UTC for comparison — DB timestamps are stored without
    # timezone info; aware datetimes (from isoformat strings) are converted to
    # naive UTC so mixed-awareness comparisons don't raise TypeError.
    def _sort_key(e: Dict[str, Any]) -> datetime:
        ts = e["occurred_at"]
        if ts is None:
            return datetime.min
        if hasattr(ts, "tzinfo") and ts.tzinfo is not None:
            return ts.replace(tzinfo=None)
        return ts

    events.sort(key=_sort_key)

    return [
        AuthorizationHistoryEvent(
            event=e["event"],
            occurred_at=e["occurred_at"],
        )
        for e in events
        if e.get("occurred_at") is not None
    ]


def _build_list_item(auth: PaymentRequestAuthorizations) -> AuthorizationListItem:
    """Map an ORM record to an AuthorizationListItem schema."""
    customer = auth.customer
    payer = auth.payer
    pr = auth.payment_request

    account_name = ""
    customer_ref: Optional[str] = None
    if customer:
        parts = [customer.first_name or "", customer.last_name or ""]
        account_name = " ".join(p for p in parts if p).strip()
        if not account_name:
            account_name = customer.business_legal_name or ""
        customer_ref = customer.account_literal or customer.customer_id

    payer_name: Optional[str] = None
    payer_email: Optional[str] = None
    if payer:
        payer_parts = [payer.first_name or "", payer.last_name or ""]
        payer_name = " ".join(p for p in payer_parts if p).strip() or None
        payer_email = payer.email

    auth_amount: Optional[float] = None
    frequency: Optional[str] = None
    if pr:
        auth_amount = float(pr.amount) if pr.amount is not None else None
        frequency = pr.payment_frequency

    return AuthorizationListItem(
        id=auth.id,
        authorization_id=auth.authorization_id,
        authorization_literal=auth.authorization_literal,
        authorization_type=auth.authorization_type,
        authorization_date=auth.authorization_date,
        status=auth.status,
        is_verified=auth.is_verified,
        payment_request_id=auth.payment_request_id,
        auth_amount=auth_amount,
        frequency=frequency,
        customer_id=auth.customer_id,
        account_name=account_name,
        customer_ref=customer_ref,
        payer_name=payer_name,
        payer_email=payer_email,
        created_at=auth.created_at,
        expired_at=auth.expired_at,
    )


# ---------------------------------------------------------------------------
# Public service functions
# ---------------------------------------------------------------------------

def list_authorizations(
    db: Session,
    merchant_id: int,
    status: Optional[str] = None,
    authorization_type: Optional[str] = None,
    date_from: Optional[datetime] = None,
    date_to: Optional[datetime] = None,
    search: Optional[str] = None,
    amount_min: Optional[float] = None,
    amount_max: Optional[float] = None,
    sort_by: str = "created_at",
    sort_desc: bool = True,
    page: int = 1,
    per_page: int = 20,
) -> AuthorizationListResponse:
    """Return a paginated, filtered list of authorizations."""
    items, total = crud.get_authorization_list(
        db=db,
        merchant_id=merchant_id,
        status=status,
        authorization_type=authorization_type,
        date_from=date_from,
        date_to=date_to,
        search=search,
        amount_min=amount_min,
        amount_max=amount_max,
        sort_by=sort_by,
        sort_desc=sort_desc,
        page=page,
        per_page=per_page,
    )

    total_pages = math.ceil(total / per_page) if total > 0 else 1

    return AuthorizationListResponse(
        items=[_build_list_item(a) for a in items],
        total=total,
        page=page,
        per_page=per_page,
        total_pages=total_pages,
    )


def get_authorization_detail(
    db: Session,
    authorization_id: str,
    merchant_id: int,
) -> AuthorizationDetailResponse:
    """Return the full detail view for a single authorization."""
    auth = crud.get_authorization_by_authorization_id(
        db=db,
        authorization_id=authorization_id,
        merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")

    # Build nested schemas
    pr_schema: Optional[AuthorizationPaymentRequestSchema] = None
    if auth.payment_request:
        pr = auth.payment_request
        pr_schema = AuthorizationPaymentRequestSchema(
            id=pr.id,
            payment_request_id=pr.payment_request_id,
            payment_request_literal=pr.payment_request_literal,
            amount=float(pr.amount) if pr.amount is not None else None,
            payment_frequency=pr.payment_frequency,
            currency=pr.currency,
            status=pr.status,
            title=pr.title,
            description=pr.description,
            due_date=pr.due_date,
        )

    customer_schema: Optional[AuthorizationCustomerSchema] = None
    if auth.customer:
        c = auth.customer
        customer_schema = AuthorizationCustomerSchema(
            id=c.id,
            customer_id=c.customer_id,
            account_literal=c.account_literal,
            first_name=c.first_name,
            last_name=c.last_name,
            email=c.email,
            phone=c.phone,
            business_legal_name=c.business_legal_name,
        )

    payer_schema: Optional[AuthorizationPayerSchema] = None
    if auth.payer:
        p = auth.payer
        payer_schema = AuthorizationPayerSchema(
            id=p.id,
            contact_id=p.contact_id,
            first_name=p.first_name,
            last_name=p.last_name,
            email=p.email,
            phone=p.phone,
        )

    hpp_schema: Optional[AuthorizationHppSessionSchema] = None
    if auth.hpp_session:
        s = auth.hpp_session
        hpp_schema = AuthorizationHppSessionSchema(
            ip_address=s.ip_address,
            user_agent=s.user_agent,
            geo_city=s.geo_city,
            geo_region=s.geo_region,
            geo_country=s.geo_country,
            geo_isp=getattr(s, "geo_isp", None),
            initiated_at=s.initiated_at,
            submitted_at=s.submitted_at,
        )

    proof = _build_proof(auth, db)
    history = _build_history_events(auth)

    return AuthorizationDetailResponse(
        id=auth.id,
        authorization_id=auth.authorization_id,
        authorization_literal=auth.authorization_literal,
        authorization_type=auth.authorization_type,
        authorization_date=auth.authorization_date,
        status=auth.status,
        is_verified=auth.is_verified,
        created_at=auth.created_at,
        updated_at=auth.updated_at,
        expired_at=auth.expired_at,
        auth_metadata=auth.auth_metadata,
        payment_request=pr_schema,
        customer=customer_schema,
        payer=payer_schema,
        hpp_session=hpp_schema,
        proof=proof,
        history=history,
    )


def get_authorization_proof(
    db: Session,
    authorization_id: str,
    merchant_id: int,
) -> AuthorizationProofResponse:
    """Return only the proof sub-section for an authorization."""
    auth = crud.get_authorization_by_authorization_id(
        db=db, authorization_id=authorization_id, merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")
    proof = _build_proof(auth, db)
    return AuthorizationProofResponse(proof=proof)


def get_authorization_history(
    db: Session,
    authorization_id: str,
    merchant_id: int,
) -> AuthorizationHistoryResponse:
    """Return only the history timeline for an authorization."""
    auth = crud.get_authorization_by_authorization_id(
        db=db, authorization_id=authorization_id, merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")
    history = _build_history_events(auth)
    return AuthorizationHistoryResponse(history=history)


def get_authorization_associations(
    db: Session,
    authorization_id: str,
    merchant_id: int,
) -> AuthorizationAssociationsResponse:
    """Return invoices and transactions linked to an authorization via payment_request_id."""
    from src.apps.invoices.models.invoice import Invoice
    from src.apps.transactions.models.transactions import Transactions
    from sqlalchemy import select

    auth = crud.get_authorization_by_authorization_id(
        db=db, authorization_id=authorization_id, merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")

    pr_id = auth.payment_request_id

    # Fetch invoices
    invoices = []
    if pr_id:
        inv_stmt = (
            select(Invoice)
            .where(Invoice.payment_request_id == pr_id, Invoice.deleted_at == None)
            .order_by(Invoice.created_at.desc())
        )
        for inv in db.execute(inv_stmt).scalars().all():
            invoices.append(AssociatedInvoiceSchema(
                id=inv.id,
                invoice_id=inv.invoice_id,
                invoice_literal=getattr(inv, 'invoice_literal', None),
                amount=float(inv.amount) if inv.amount is not None else None,
                status=inv.status,
                status_text=inv.status_text if hasattr(inv, 'status_text') else None,
                created_at=inv.created_at,
                due_date=getattr(inv, 'due_date', None),
            ))

    # Fetch transactions
    transactions = []
    if pr_id:
        from src.apps.payment_requests.models.payment_request import PaymentRequest
        pr = db.execute(select(PaymentRequest).where(PaymentRequest.id == pr_id)).scalar_one_or_none()
        pr_auth_type = getattr(pr, "authorization_type", None) if pr else None

        txn_stmt = (
            select(Transactions)
            .where(
                Transactions.payment_request_id == pr_id,
                Transactions.merchant_id == merchant_id,
            )
            .order_by(Transactions.ocurred_at.desc())
        )
        for txn in db.execute(txn_stmt).scalars().all():
            transactions.append(AssociatedTransactionSchema(
                id=txn.id,
                txn_id=txn.txn_id,
                txn_literal=txn.txn_literal,
                txn_amount=float(txn.txn_amount) if txn.txn_amount is not None else None,
                txn_status=txn.txn_status,
                status_text=txn.status_text if hasattr(txn, 'status_text') else None,
                ocurred_at=txn.ocurred_at,
                payment_type=getattr(txn, 'payment_type', None),
                authorization_type=pr_auth_type,
            ))

    return AuthorizationAssociationsResponse(invoices=invoices, transactions=transactions)


def get_authorization_invoices(
    db: Session,
    authorization_id: str,
    merchant_id: int,
) -> "AuthorizationInvoicesResponse":
    """Return invoices linked to an authorization via payment_request_id."""
    from src.apps.invoices.models.invoice import Invoice
    from sqlalchemy import select
    from src.apps.authorizations.schemas.authorization_detail import AuthorizationInvoicesResponse

    auth = crud.get_authorization_by_authorization_id(
        db=db, authorization_id=authorization_id, merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")

    invoices = []
    if auth.payment_request_id:
        inv_stmt = (
            select(Invoice)
            .where(Invoice.payment_request_id == auth.payment_request_id, Invoice.deleted_at == None)
            .order_by(Invoice.created_at.desc())
        )
        for inv in db.execute(inv_stmt).scalars().all():
            invoices.append(AssociatedInvoiceSchema(
                id=inv.id,
                invoice_id=inv.invoice_id,
                invoice_literal=getattr(inv, 'invoice_literal', None),
                amount=float(inv.amount) if inv.amount is not None else None,
                status=inv.status,
                status_text=inv.status_text if hasattr(inv, 'status_text') else None,
                created_at=inv.created_at,
                due_date=getattr(inv, 'due_date', None),
            ))

    return AuthorizationInvoicesResponse(invoices=invoices, total=len(invoices))


def get_authorization_transactions(
    db: Session,
    authorization_id: str,
    merchant_id: int,
) -> "AuthorizationTransactionsResponse":
    """Return transactions linked to an authorization via payment_request_id."""
    from src.apps.transactions.models.transactions import Transactions
    from sqlalchemy import select
    from src.apps.authorizations.schemas.authorization_detail import AuthorizationTransactionsResponse

    auth = crud.get_authorization_by_authorization_id(
        db=db, authorization_id=authorization_id, merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")

    transactions = []
    if auth.payment_request_id:
        from src.apps.payment_requests.models.payment_request import PaymentRequest
        pr = db.execute(select(PaymentRequest).where(PaymentRequest.id == auth.payment_request_id)).scalar_one_or_none()
        pr_auth_type = getattr(pr, "authorization_type", None) if pr else None

        txn_stmt = (
            select(Transactions)
            .where(
                Transactions.payment_request_id == auth.payment_request_id,
                Transactions.merchant_id == merchant_id,
            )
            .order_by(Transactions.ocurred_at.desc())
        )
        for txn in db.execute(txn_stmt).scalars().all():
            transactions.append(AssociatedTransactionSchema(
                id=txn.id,
                txn_id=txn.txn_id,
                txn_literal=txn.txn_literal,
                txn_amount=float(txn.txn_amount) if txn.txn_amount is not None else None,
                txn_status=txn.txn_status,
                status_text=txn.status_text if hasattr(txn, 'status_text') else None,
                ocurred_at=txn.ocurred_at,
                payment_type=getattr(txn, 'payment_type', None),
                authorization_type=pr_auth_type,
            ))

    return AuthorizationTransactionsResponse(transactions=transactions, total=len(transactions))


def get_summary(
    db: Session,
    merchant_id: int,
    limit: int = 3,
) -> AuthorizationSummaryResponse:
    """Return top-N customer account summaries."""
    data = crud.get_authorization_summary(db=db, merchant_id=merchant_id, limit=limit)
    return AuthorizationSummaryResponse(**data)


def export_csv(
    db: Session,
    merchant_id: int,
    status: Optional[str] = None,
    authorization_type: Optional[str] = None,
    date_from: Optional[datetime] = None,
    date_to: Optional[datetime] = None,
    search: Optional[str] = None,
    amount_min: Optional[float] = None,
    amount_max: Optional[float] = None,
) -> StreamingResponse:
    """
    Stream all matching authorizations as a CSV download.

    File name: authorizations_export_<YYYYMMDD>.csv
    Columns: Auth #, Acct Name, Payer, Frequency, Auth Type, Date, Auth Amount, Status,
             IP Address, Geo Location
    """
    items = crud.get_authorization_list_for_export(
        db=db,
        merchant_id=merchant_id,
        status=status,
        authorization_type=authorization_type,
        date_from=date_from,
        date_to=date_to,
        search=search,
        amount_min=amount_min,
        amount_max=amount_max,
    )

    output = io.StringIO()
    writer = csv.writer(output)

    # Header row
    writer.writerow(
        [
            "Auth #",
            "Acct Name",
            "Payer",
            "Frequency",
            "Auth Type",
            "Date",
            "Auth Amount",
            "Status",
            "IP Address",
            "Geo Location",
        ]
    )

    for auth in items:
        customer = auth.customer
        payer = auth.payer
        pr = auth.payment_request
        hpp = auth.hpp_session

        acct_name = ""
        if customer:
            parts = [customer.first_name or "", customer.last_name or ""]
            acct_name = " ".join(p for p in parts if p).strip()
            if not acct_name:
                acct_name = customer.business_legal_name or ""

        payer_name = ""
        if payer:
            payer_parts = [payer.first_name or "", payer.last_name or ""]
            payer_name = " ".join(p for p in payer_parts if p).strip()

        frequency = pr.payment_frequency if pr else ""
        auth_amount = f"{pr.amount:.2f}" if pr and pr.amount is not None else ""

        auth_date = (
            auth.authorization_date.strftime("%Y-%m-%d %H:%M:%S")
            if auth.authorization_date
            else ""
        )

        ip_address = auth.ip_address or (hpp.ip_address if hpp else "") or ""

        geo_parts = []
        if hpp:
            for part in [hpp.geo_city, hpp.geo_region, hpp.geo_country]:
                if part:
                    geo_parts.append(part)
        geo_location = ", ".join(geo_parts)

        writer.writerow(
            [
                auth.authorization_id or str(auth.id),
                acct_name,
                payer_name,
                frequency,
                auth.authorization_type,
                auth_date,
                auth_amount,
                auth.status or "",
                ip_address,
                geo_location,
            ]
        )

    output.seek(0)
    date_str = datetime.now(timezone.utc).strftime("%Y%m%d")
    filename = f"authorizations_export_{date_str}.csv"

    return StreamingResponse(
        iter([output.getvalue()]),
        media_type="text/csv",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )


def expire_authorization(
    db: Session,
    authorization_id: str,
    merchant_id: int,
) -> ExpireAuthorizationResponse:
    """Transition an authorization to EXPIRED status."""
    auth = crud.get_authorization_by_authorization_id(
        db=db,
        authorization_id=authorization_id,
        merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")

    current_status = (auth.status or "").upper()
    if current_status in _TERMINAL_STATUSES:
        raise ConflictError(
            f"Authorization is already in terminal status '{auth.status}' and cannot be expired."
        )

    auth = crud.expire_authorization(db=db, authorization=auth)
    db.commit()
    db.refresh(auth)

    return ExpireAuthorizationResponse(
        authorization_id=auth.authorization_id or str(auth.id),
        status=auth.status,
        expired_at=auth.expired_at,
        message="Authorization has been expired successfully.",
    )


def request_payment_method_update(
    db: Session,
    authorization_id: str,
    merchant_id: int,
    contact_ids: List[int],
    channels: List[str],
    note: Optional[str] = None,
) -> RequestPaymentUpdateResponse:
    """
    Store a payment-method update token on the authorization and dispatch
    a notification to each contact/channel combination.
    """
    if not contact_ids:
        from src.core.exceptions import APIException
        raise APIException(
            message="At least one contact_id must be provided.",
            status_code=422,
        )
    if not channels:
        from src.core.exceptions import APIException
        raise APIException(
            message="At least one notification channel must be provided.",
            status_code=422,
        )

    auth = crud.get_authorization_by_authorization_id(
        db=db,
        authorization_id=authorization_id,
        merchant_id=merchant_id,
    )
    if auth is None:
        raise NotFoundError("Authorization not found")

    # Validate that every supplied contact_id belongs to the authorization's customer.
    # Without this check a caller could target contacts from any other customer/merchant.
    _validate_contact_ids_belong_to_customer(
        db=db,
        contact_ids=contact_ids,
        customer_id=auth.customer_id,
        merchant_id=merchant_id,
    )

    now = datetime.now(timezone.utc)
    token = str(uuid.uuid4())
    expiry = now + timedelta(hours=_PM_UPDATE_TOKEN_TTL_HOURS)

    # Update auth_metadata with a new dict (not in-place mutation) so
    # SQLAlchemy detects the change on the JSON column.
    existing_meta: dict = dict(auth.auth_metadata or {})
    existing_meta.update(
        {
            "payment_method_update_token": token,
            "payment_method_update_expires_at": expiry.isoformat(),
            "payment_method_update_requested_at": now.isoformat(),
        }
    )
    auth.auth_metadata = existing_meta
    db.flush()

    # Dispatch notifications
    notifications_sent = 0
    for contact_id in contact_ids:
        for channel in channels:
            notifications_sent += _dispatch_pm_update_notification(
                db=db,
                auth=auth,
                contact_id=contact_id,
                channel=channel,
                token=token,
                note=note,
            )

    db.commit()

    return RequestPaymentUpdateResponse(
        authorization_id=auth.authorization_id or str(auth.id),
        # update_token is NOT returned in the response — it is dispatched
        # directly to the customer via the notification channel.
        notifications_dispatched=notifications_sent,
        message=(
            f"Payment method update request created. "
            f"{notifications_sent} notification(s) dispatched."
        ),
    )


def _dispatch_pm_update_notification(
    db: Session,
    auth: PaymentRequestAuthorizations,
    contact_id: int,
    channel: str,
    token: str,
    note: Optional[str] = None,
) -> int:
    """
    Send a payment-method update notification to a single contact via a single channel.

    Returns 1 on success, 0 on failure (logs the error; never raises).
    """
    try:
        from src.apps.customers.models.customer_contact import CustomerContact
        from sqlalchemy import select as sa_select

        contact_stmt = sa_select(CustomerContact).where(
            CustomerContact.id == contact_id,
            CustomerContact.deleted_at.is_(None),
        )
        contact = db.execute(contact_stmt).scalar_one_or_none()
        if contact is None:
            logger.warning(
                "PM update notification skipped: contact %s not found", contact_id
            )
            return 0

        recipient = contact.email if channel == "email" else contact.phone
        if not recipient:
            logger.warning(
                "PM update notification skipped: contact %s has no %s",
                contact_id,
                channel,
            )
            return 0

        logger.info(
            "Dispatching PM update notification: authorization=%s contact=%s channel=%s recipient=%s",
            auth.authorization_id,
            contact_id,
            channel,
            recipient,
        )

        # Attempt to dispatch via the notifications service if available.
        # If the service or template is not yet configured, log and continue.
        # NOTE: notifications module is a stub; this block will fall through to
        # the warning log until the notifications service is implemented.
        try:
            from src.apps.notifications.services import send_notification_sync
            from src.apps.notifications.schemas.notification_schemas import (
                SendNotificationSchema,
            )

            notification = SendNotificationSchema(
                template_key="payment_method_update_request",
                channel=channel,
                recipient=recipient,
                template_vars={
                    "authorization_id": auth.authorization_id or str(auth.id),
                    "update_token": token,
                    "note": note or "",
                },
            )
            send_notification_sync(db=db, notification=notification)
        except Exception as notify_exc:
            logger.warning(
                "Notification dispatch failed (stub fallback): %s", notify_exc
            )

        return 1

    except Exception as exc:
        logger.error(
            "Unexpected error dispatching PM update notification to contact %s: %s",
            contact_id,
            exc,
        )
        return 0


def generate_authorization_pdf(
    db: Session,
    authorization_id: str,
    merchant_id: int,
    merchant: Any,
) -> bytes:
    """
    Generate a full-detail PDF for a single authorization.

    Fetches the detail, proof, history, and associations in one pass and
    delegates rendering to AuthDetailPDFGenerator.

    Args:
        db:               SQLAlchemy session
        authorization_id: authorization_id string (e.g. "auth_xxx")
        merchant_id:      authenticated merchant's PK
        merchant:         merchant ORM record (for name/email in the PDF header)

    Returns:
        Raw PDF bytes suitable for a StreamingResponse.

    Raises:
        NotFoundError: authorization not found or not owned by merchant
        RuntimeError:  PDF rendering failure
    """
    from src.apps.authorizations.helpers.auth_detail_pdf import AuthDetailPDFGenerator
    from src.apps.invoices.models.invoice import Invoice
    from src.apps.transactions.models.transactions import Transactions
    from sqlalchemy import select

    auth = crud.get_authorization_by_authorization_id(
        db=db, authorization_id=authorization_id, merchant_id=merchant_id
    )
    if auth is None:
        raise NotFoundError("Authorization not found")

    # Build detail response (reuses existing logic)
    detail = get_authorization_detail(
        db=db, authorization_id=authorization_id, merchant_id=merchant_id
    )

    # Proof is already embedded in detail; pull it out explicitly for the generator
    proof = detail.proof

    # History is already embedded in detail
    history = detail.history

    # Fetch associations
    invoices: List[Any] = []
    transactions: List[Any] = []
    pr_id = auth.payment_request_id
    if pr_id:
        inv_stmt = (
            select(Invoice)
            .where(Invoice.payment_request_id == pr_id, Invoice.deleted_at == None)
            .order_by(Invoice.created_at.desc())
        )
        for inv in db.execute(inv_stmt).scalars().all():
            invoices.append(AssociatedInvoiceSchema(
                id=inv.id,
                invoice_id=inv.invoice_id,
                invoice_literal=getattr(inv, "invoice_literal", None),
                amount=float(inv.amount) if inv.amount is not None else None,
                status=inv.status,
                status_text=inv.status_text if hasattr(inv, "status_text") else None,
                created_at=inv.created_at,
                due_date=getattr(inv, "due_date", None),
            ))

        from src.apps.payment_requests.models.payment_request import PaymentRequest
        pr_obj = db.execute(select(PaymentRequest).where(PaymentRequest.id == pr_id)).scalar_one_or_none()
        pr_auth_type = getattr(pr_obj, "authorization_type", None) if pr_obj else None

        txn_stmt = (
            select(Transactions)
            .where(
                Transactions.payment_request_id == pr_id,
                Transactions.merchant_id == merchant_id,
            )
            .order_by(Transactions.ocurred_at.desc())
        )
        for txn in db.execute(txn_stmt).scalars().all():
            transactions.append(AssociatedTransactionSchema(
                id=txn.id,
                txn_id=txn.txn_id,
                txn_literal=txn.txn_literal,
                txn_amount=float(txn.txn_amount) if txn.txn_amount is not None else None,
                txn_status=txn.txn_status,
                status_text=txn.status_text if hasattr(txn, "status_text") else None,
                ocurred_at=txn.ocurred_at,
                payment_type=getattr(txn, "payment_type", None),
                authorization_type=pr_auth_type,
            ))

    generator = AuthDetailPDFGenerator()
    return generator.generate_pdf(
        auth=detail,
        merchant=merchant,
        proof=proof,
        history=history,
        invoices=invoices,
        transactions=transactions,
    )
