"""
HPP Services — business logic for the Hosted Payment Page.

This module orchestrates all HPP flows:
  - initiate: token validation, session creation, data assembly
  - submit: payment processing, authorization recording, event dispatch
  - retry initiate / submit: failed-installment retry flow
  - OTP send / verify
  - Customer register / login / email-verify
  - Merchant link management: list, resend, revoke
  - Merchant-triggered transaction retry
"""

import logging
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional

import bcrypt
from fastapi import Request
from sqlalchemy import select
from sqlalchemy.orm import Session

try:
    import geoip2.database
    import geoip2.errors
    _GEOIP_AVAILABLE = True
except ImportError:
    _GEOIP_AVAILABLE = False

from src.apps.hpp import crud as hpp_crud
from src.apps.hpp.helpers.token_helpers import (
    generate_otp,
    generate_retry_token,
    hash_otp,
    mask_token,
    verify_otp,
)
from src.apps.hpp.schemas.hpp_schemas import (
    HppInitiateResponseSchema,
    HppRetryInitiateResponseSchema,
    HppRetrySubmitResponseSchema,
    HppSubmitResponseSchema,
    LineItemSchema,
    MerchantBrandingSchema,
    OtpSendResponseSchema,
    OtpVerifyResponseSchema,
    PaymentLinkSchema,
    PaymentLinksListResponseSchema,
    ResendLinkResponseSchema,
    RevokeLinkResponseSchema,
    SavedPaymentMethodSchema,
    TransactionRetryResponseSchema,
    CustomerRegisterResponseSchema,
    CustomerLoginResponseSchema,
)
from src.apps.payment_methods.models.payment_methods import PaymentMethod
from src.apps.payment_requests.models.payment_request import PaymentRequest
from src.apps.payment_requests.models.payment_request_authorizations import (
    PaymentRequestAuthorizations,
)
from src.apps.payment_requests.models.payment_request_links import PaymentRequestLinks
from src.apps.transactions.models.transactions import Transactions
from src.apps.users.models.user import User
from src.core.config import settings
from src.core.exceptions import (
    APIException,
    ConflictError,
    NotFoundError,
    UnauthorizedError,
)
from src.core.utils.enums import (
    AuthorizationRecordTypes,
    AuthorizationStatus,
    TransactionStatusTypes,
    TransactionCategories,
    TransactionTypes,
    TransactionSourceTypes,
)
from src.apps.payment_providers.models.merchant_provider_config import MerchantProviderConfig
from src.apps.payment_requests.enums import PaymentRequestStatusTypes
from src.events.dispatcher import EventDispatcher
from src.events.base import BaseEvent

logger = logging.getLogger(__name__)


def _get_client_ip(request: Request) -> Optional[str]:
    """
    Extract the real client IP from a request.
    Prefers X-Forwarded-For (set by load balancers / reverse proxies in production)
    and falls back to the direct TCP connection IP.
    """
    forwarded_for = request.headers.get("X-Forwarded-For", "").strip()
    if forwarded_for:
        return forwarded_for.split(",")[0].strip()
    return request.client.host if request.client else None


def _resolve_geoip(ip_address: str) -> dict:
    """Resolve IP to geo fields. Returns empty dict if unavailable."""
    if not _GEOIP_AVAILABLE:
        return {}
    db_path = getattr(settings, "MAXMIND_DB_PATH", None)
    if not db_path:
        return {}
    try:
        import os
        if not os.path.exists(db_path):
            logger.warning("MaxMind DB not found at %s — geo fields will be null", db_path)
            return {}
        with geoip2.database.Reader(db_path) as reader:
            response = reader.city(ip_address)
            return {
                "geo_city": response.city.name,
                "geo_region": response.subdivisions.most_specific.name,
                "geo_country": response.country.iso_code,
                "geo_isp": None,  # City DB does not include ISP; requires GeoLite2-ASN
            }
    except Exception as exc:
        logger.warning("GeoIP lookup failed for %s: %s", ip_address, exc)
        return {}


# SEC-007: constant-time dummy bcrypt hash used when user is not found, to prevent
# user enumeration via timing differences between "user not found" and "wrong password".
DUMMY_HASH = "$2b$12$LQv3c1yqBwEHmBZAFHMpBOeRrigUlpjeA5wQvN2VDW5AMNNuqW1uW"

# OTP constants
OTP_EXPIRY_MINUTES = 10
OTP_MAX_SENDS = 3
OTP_MAX_ATTEMPTS = 5

# Retry token expiry
RETRY_TOKEN_EXPIRY_HOURS = 72


# ─── Token validation helper ──────────────────────────────────────────────────

def _get_valid_link(db: Session, token: str) -> PaymentRequestLinks:
    """
    Fetch and validate a payment link token.

    Raises:
        NotFoundError (404): Token does not exist.
        APIException (410): Link is expired or REVOKED/USED.
        ConflictError (409): Payment request is already PAID.
    """
    link = hpp_crud.get_link_by_token(db, token)
    if not link:
        raise NotFoundError(message="Payment link not found")

    now = datetime.now(timezone.utc)

    # Check expiry by end_date
    if link.end_date:
        end_date = link.end_date
        if end_date.tzinfo is None:
            end_date = end_date.replace(tzinfo=timezone.utc)
        if end_date < now:
            raise APIException(
                message="This payment link has expired",
                status_code=410,
            )

    if link.status in ("EXPIRED", "REVOKED"):
        raise APIException(
            message=f"This payment link is no longer active (status: {link.status})",
            status_code=410,
        )

    if link.status == "USED":
        raise ConflictError(message="Payment has already been completed for this link")

    return link


def _get_payment_request(db: Session, payment_request_id: int) -> PaymentRequest:
    """Fetch a PaymentRequest by id, enforcing soft-delete filter."""
    stmt = select(PaymentRequest).where(
        PaymentRequest.id == payment_request_id,
        PaymentRequest.deleted_at.is_(None),
    )
    pr = db.execute(stmt).scalar_one_or_none()
    if not pr:
        raise NotFoundError(message="Payment request not found")
    return pr


def _build_branding(pr: PaymentRequest, db: Session) -> MerchantBrandingSchema:
    """Extract merchant branding data from a PaymentRequest's merchant.

    Uses the caller's existing DB session to avoid opening extra connections
    via SessionCelery(), which can exhaust the connection pool.
    """
    from src.apps.settings.services import get_group_settings
    merchant = getattr(pr, "merchant", None)
    if merchant:
        mid = merchant.id
        try:
            colors = get_group_settings("Colors", mid, db)
            logo = get_group_settings("Logo", mid, db)
            urls = get_group_settings("Business URLs", mid, db)
        except Exception as exc:
            logger.warning("Could not load branding settings for merchant %s: %s", mid, exc)
            colors, logo, urls = {}, {}, {}
        def _to_bool(val: str, default: bool = True) -> bool:
            if val is None:
                return default
            return val.lower() != "false"

        return MerchantBrandingSchema(
            merchant_name=getattr(merchant, "name", ""),
            primary_color=colors.get("primary_color") or None,
            secondary_color=colors.get("secondary_color") or None,
            accent_color=colors.get("accent_color") or None,
            logo_url=logo.get("logo_url") or None,
            support_email=urls.get("support_email") or None,
            support_phone=urls.get("support_phone") or None,
            website_url=urls.get("website_url") or None,
            show_support_email=_to_bool(urls.get("show_support_email")),
            show_support_phone=_to_bool(urls.get("show_support_phone")),
            show_website_url=_to_bool(urls.get("show_website_url")),
        )
    return MerchantBrandingSchema(merchant_name="")


def _build_saved_payment_methods(
    db: Session, customer_id: Optional[int], merchant_id: int
) -> List[SavedPaymentMethodSchema]:
    """Return masked saved payment methods for a customer scoped to a merchant."""
    if not customer_id:
        return []
    try:
        stmt = select(PaymentMethod).where(
            PaymentMethod.customer_id == customer_id,
            PaymentMethod.merchant_id == merchant_id,
            PaymentMethod.deleted_at.is_(None),
        )
        methods = db.execute(stmt).scalars().all()
        result = []
        for m in methods:
            cd = m.card_details if m.method == "card" else None
            ad = m.ach_details if m.method == "ach" else None
            result.append(
                SavedPaymentMethodSchema(
                    id=m.id,
                    card_last4=cd.card_number if cd else None,
                    card_brand=cd.brand if cd else None,
                    exp_month=cd.expire_month if cd else None,
                    exp_year=cd.expire_year if cd else None,
                    payment_type=m.method,
                    is_default=m.is_default,
                )
            )
        return result
    except Exception as exc:
        logger.warning("Failed to fetch saved payment methods: %s", exc)
        return []


def _payment_request_has_invoice(payment_request_id: int, db: Session) -> bool:
    """Return True if at least one non-deleted invoice is linked to this payment request."""
    from src.apps.invoices.models.invoice import Invoice
    from sqlalchemy import select, exists
    return db.execute(
        select(exists().where(
            Invoice.payment_request_id == payment_request_id,
            Invoice.deleted_at.is_(None),
        ))
    ).scalar()


# ─── Initiate ─────────────────────────────────────────────────────────────────

async def initiate_hpp(
    db: Session,
    token: str,
    request: Request,
) -> HppInitiateResponseSchema:
    """
    Validate a payment link token, record an HPP session, and return full
    page context data.

    Steps:
      1. Validate token (404/410/409 per PRD HWHPP-102)
      2. Load payment request and related data
      3. Increment link click_count / last_clicked_at
      4. Create HPPSession
      5. Update PR status to WAITING if currently CREATED
      6. Return assembled payload
    """
    link = _get_valid_link(db, token)
    pr = _get_payment_request(db, link.payment_request_id)

    # Extract client metadata
    ip_address = _get_client_ip(request)
    user_agent = request.headers.get("user-agent")

    # Increment click count
    hpp_crud.increment_link_click_count(db, link)

    # Create HPP session
    hpp_session = hpp_crud.create_hpp_session(
        db=db,
        payment_request_id=pr.id,
        token=token,
        ip_address=ip_address,
        user_agent=user_agent,
    )

    # Resolve GeoIP fields and stamp onto the session (before commit)
    if ip_address:
        geo = _resolve_geoip(ip_address)
        if geo:
            hpp_session.geo_city = geo.get("geo_city")
            hpp_session.geo_region = geo.get("geo_region")
            hpp_session.geo_country = geo.get("geo_country")
            hpp_session.geo_isp = geo.get("geo_isp")

    # Update PR status: CREATED → WAITING (status 1 → 2; using numeric values from enum)
    # Only advance; never go backwards
    created_status = PaymentRequestStatusTypes.CREATED.value
    waiting_status = getattr(PaymentRequestStatusTypes, "WAITING", None)
    if waiting_status and pr.status == created_status:
        pr.status = waiting_status.value

    db.commit()

    # Build line items
    line_items = []
    for item in getattr(pr, "line_items", []):
        line_items.append(
            LineItemSchema(
                description=getattr(item, "name", "") or getattr(item, "title", "") or "",
                quantity=float(getattr(item, "quantity", 1) or 1),
                unit_price=float(getattr(item, "unit_price", 0) or 0),
                total=float(getattr(item, "total", 0) or 0),
                product_id=getattr(item, "product_id", None),
            )
        )

    # Customer info (from payment_request_customers if available)
    customer_name = None
    customer_email = None
    customer_phone = None
    customer_id = None
    for pr_customer in getattr(pr, "payment_request_customers", []):
        cust = getattr(pr_customer, "customer", None)
        if cust:
            customer_id = cust.id
            bln = getattr(cust, "business_legal_name", None)
            fn = getattr(cust, "first_name", None) or ""
            ln = getattr(cust, "last_name", None) or ""
            full = f"{fn} {ln}".strip()
            customer_name = bln or full or None
            customer_email = getattr(cust, "email", None)
            customer_phone = getattr(cust, "phone", None)
            break

    saved_pms = _build_saved_payment_methods(db, customer_id, pr.merchant_id)

    # Determine whether the customer already has a user account for this merchant
    customer_has_account = False
    if customer_id:
        stmt = select(User).where(
            User.customer_id == customer_id,
            User.merchant_id == pr.merchant_id,
            User.user_type == "customer",
            User.deleted_at.is_(None),
        )
        customer_has_account = db.execute(stmt).scalar_one_or_none() is not None

    # Load HPP settings (tip buttons + payer display flags) for this merchant
    hpp_tip_settings: dict = {}
    try:
        from src.apps.settings.services import get_hpp_settings
        hpp_tip_settings = get_hpp_settings(pr.merchant_id, db)
    except Exception as exc:
        logger.warning("Could not load hpp_settings for merchant %s: %s", pr.merchant_id, exc)

    # Stamp payer display settings onto the session now that hpp_tip_settings is loaded
    hpp_session.display_payer_ip = bool(hpp_tip_settings.get("display_payer_ip", False))
    hpp_session.display_payer_location = bool(hpp_tip_settings.get("display_payer_location", False))
    db.commit()

    # Load invoice display settings for this merchant
    invoice_display_settings: dict = {}
    try:
        from src.apps.settings.services import get_invoice_settings
        invoice_display_settings = get_invoice_settings(pr.merchant_id, db)
    except Exception as exc:
        logger.warning("Could not load invoice_settings for merchant %s: %s", pr.merchant_id, exc)

    # Load receipt display settings for this merchant
    receipt_display_settings: dict = {}
    try:
        from src.apps.settings.services import get_receipt_settings
        receipt_display_settings = get_receipt_settings(pr.merchant_id, db)
    except Exception as exc:
        logger.warning("Could not load receipt_settings for merchant %s: %s", pr.merchant_id, exc)

    # Load the merchant's active provider config for public tokenization fields
    provider_config: Optional[dict] = None
    try:
        pc_stmt = select(MerchantProviderConfig).where(
            MerchantProviderConfig.merchant_id == pr.merchant_id,
            MerchantProviderConfig.is_active.is_(True),
            MerchantProviderConfig.deleted_at.is_(None),
        )
        merchant_pc = db.execute(pc_stmt).scalar_one_or_none()
        if merchant_pc and merchant_pc.config_data:
            cfg = merchant_pc.config_data
            provider_config = {
                "api_key": cfg.get("api_key"),
                "merchant_id": cfg.get("merchant_id"),
                "mode": cfg.get("mode"),
            }
    except Exception as exc:
        logger.warning("Could not load provider_config for merchant %s: %s", pr.merchant_id, exc)

    return HppInitiateResponseSchema(
        payment_request_id=pr.id,
        payment_request_literal=pr.payment_request_literal,
        amount=float(pr.amount or 0),
        currency=pr.currency or "USD",
        payment_frequency=pr.payment_frequency,
        authorization_type=pr.authorization_type,
        message=pr.message,
        title=pr.title,
        description=pr.description,
        terms=pr.terms,
        due_date=pr.due_date,
        billing_date=pr.billing_date,
        allow_tip=pr.allow_tip or False,
        tip_btn_1=str(hpp_tip_settings.get("tip_btn_1", "10")),
        tip_btn_2=str(hpp_tip_settings.get("tip_btn_2", "15")),
        tip_btn_3=str(hpp_tip_settings.get("tip_btn_3", "20")),
        tip_btn_4=str(hpp_tip_settings.get("tip_btn_4", "25")),
        tip_btn_type=str(hpp_tip_settings.get("tip_btn_type", "percentage")),
        show_item_description=bool(invoice_display_settings.get("show_item_description", True)),
        show_customer_name=bool(invoice_display_settings.get("show_customer_name", True)),
        show_transaction_time=bool(invoice_display_settings.get("show_transaction_time", True)),
        show_invoice_number=bool(invoice_display_settings.get("show_invoice_number", True)),
        receipt_show_item_description=bool(receipt_display_settings.get("show_item_description", True)),
        receipt_show_customer_name=bool(receipt_display_settings.get("show_customer_name", True)),
        receipt_show_transaction_time=bool(receipt_display_settings.get("show_transaction_time", True)),
        receipt_show_receipt_number=bool(receipt_display_settings.get("show_receipt_number", True)),
        require_billing_address=pr.require_billing_address or False,
        require_shipping_address=pr.require_shipping_address or False,
        require_cvv=pr.require_cvv if pr.require_cvv is not None else True,
        require_sms_authorization=pr.require_sms_authorization or False,
        require_signature_authorization=pr.require_signature_authorization or False,
        save_payment_method=pr.save_payment_method or False,
        line_items=line_items,
        branding=_build_branding(pr, db),
        customer_name=customer_name,
        customer_email=customer_email,
        customer_phone=customer_phone,
        saved_payment_methods=saved_pms,
        hpp_session_id=hpp_session.id,
        customer_has_account=customer_has_account,
        provider_config=provider_config,
        display_payer_ip=bool(hpp_session.display_payer_ip),
        display_payer_location=bool(hpp_session.display_payer_location),
        payer_ip_address=hpp_session.ip_address if hpp_session.display_payer_ip else None,
        payer_location=", ".join(
            filter(None, [hpp_session.geo_city, hpp_session.geo_region, hpp_session.geo_country])
        ) or None if hpp_session.display_payer_location else None,
        has_invoice=_payment_request_has_invoice(pr.id, db),
    )


# ─── Submit ───────────────────────────────────────────────────────────────────

async def submit_hpp(
    db: Session,
    payload: Any,
    request: Request,
) -> HppSubmitResponseSchema:
    """
    Process an HPP payment submission.

    Steps:
      1. Re-validate token
      2. B-2 idempotency check — return existing PAID transaction if present
      3. B-1 OTP server-side re-validation (never trust payload.otp_verified)
      4. Validate authorization requirements
      5. B-2 pessimistic lock on the payment_request_links row
      6. Create Transaction
      7. Create PaymentRequestAuthorization record
      8. Mark payment link USED
      9. Update PR status
      10. Emit events: transaction.completed OR transaction.scheduled, payment_request.authorized
      11. Return transaction summary
    """
    from src.apps.base.utils.functions import generate_secure_id
    from src.apps.transactions.services import generate_txn_literal

    link = _get_valid_link(db, payload.token)
    pr = _get_payment_request(db, link.payment_request_id)

    # ── B-2: Idempotency — return existing non-failed transaction ─────────────
    existing_txn_stmt = select(Transactions).where(
        Transactions.payment_request_id == pr.id,
        Transactions.txn_status != TransactionStatusTypes.FAILED,
    ).order_by(Transactions.id.desc()).limit(1)
    existing_txn = db.execute(existing_txn_stmt).scalar_one_or_none()
    if existing_txn:
        logger.info(
            "submit_hpp: idempotent return for payment_request_id=%s, "
            "existing transaction id=%s status=%s",
            pr.id,
            existing_txn.id,
            existing_txn.txn_status,
        )
        status_text = (
            "paid"
            if existing_txn.txn_status == TransactionStatusTypes.PAID
            else "processing"
        )
        return HppSubmitResponseSchema(
            transaction_id=existing_txn.id,
            txn_id=existing_txn.txn_id,
            txn_literal=existing_txn.txn_literal,
            txn_amount=float(existing_txn.txn_amount or 0),
            txn_status=int(existing_txn.txn_status),
            status_text=status_text,
            payment_request_id=pr.id,
            is_scheduled=False,
            message="Payment already submitted — returning existing result",
        )

    # ── B-1: Server-side OTP re-validation — NEVER trust payload.otp_verified ─
    # SEC-001 fix: derive auth_type from the server-side PR record, never from client payload
    _pr_auth_type = (pr.authorization_type or "checkbox")
    if hasattr(_pr_auth_type, "value"):
        _pr_auth_type = _pr_auth_type.value
    auth_type = _pr_auth_type.lower()
    if auth_type == "sms_otp":
        active_otp = hpp_crud.get_active_otp(db, pr.id)
        if not active_otp or not active_otp.is_verified:
            raise APIException(
                message="OTP verification is required before submitting payment. "
                        "Please verify your OTP and try again.",
                status_code=400,
            )

    # ── Authorization validation ───────────────────────────────────────────────
    if auth_type == "checkbox" and not payload.agree_to_terms:
        raise APIException(
            message="You must agree to the terms to proceed",
            status_code=400,
        )

    # ── B-2: Pessimistic lock — prevent concurrent duplicate submissions ───────
    # Re-fetch the link with FOR UPDATE to serialise concurrent requests.
    from sqlalchemy import text as sa_text
    locked_link_stmt = (
        select(PaymentRequestLinks)
        .where(PaymentRequestLinks.id == link.id)
        .with_for_update()
    )
    locked_link = db.execute(locked_link_stmt).scalar_one_or_none()
    if not locked_link or locked_link.status != "PENDING":
        raise ConflictError(message="Payment has already been completed for this link")

    # Determine transaction status
    # For split/recurring with future dates, use PROCESSING; otherwise PAID
    is_scheduled = False
    total_amount = float(pr.amount or 0) + float(payload.tip_amount or 0)
    txn_status = TransactionStatusTypes.PAID

    # Resolve customer
    customer_id = None
    _hpp_customer = None
    for pr_customer in getattr(pr, "payment_request_customers", []):
        cust = getattr(pr_customer, "customer", None)
        if cust:
            customer_id = cust.id
            _hpp_customer = cust
            break

    if not customer_id:
        raise APIException(
            message="No customer associated with this payment request",
            status_code=400,
        )

    # Resolve payer_id — FK to customer_contacts (NOT customers)
    from src.apps.customers.models.customer_contact import CustomerContact
    contact_stmt = select(CustomerContact).where(
        CustomerContact.customer_id == customer_id,
        CustomerContact.deleted_at.is_(None),
    ).limit(1)
    contact = db.execute(contact_stmt).scalar_one_or_none()
    payer_id = contact.id if contact else None

    # Mock payment: generate a synthetic transaction ID
    mock_txn_id = f"mock_{uuid.uuid4().hex[:12]}"
    txn_literal = generate_txn_literal(db)

    # Create Transaction
    transaction = Transactions(
        txn_amount=total_amount,
        txn_type="charge",
        txn_status=txn_status,
        txn_id=mock_txn_id,
        txn_literal=txn_literal,
        currency=pr.currency or "USD",
        description=f"HPP payment for {pr.payment_request_literal or pr.id}",
        category=TransactionCategories.CHARGE,
        transaction_type=TransactionTypes.HPP,
        txn_source=TransactionSourceTypes.HPP,
        payment_request_id=pr.id,
        merchant_id=pr.merchant_id,
        customer_id=customer_id,
    )

    # Wire payment method if saved PM selected
    card_last4: Optional[str] = None
    if payload.saved_payment_method_id:
        # SEC-003: verify the payment method belongs to this customer and merchant
        from src.apps.payment_methods.models.payment_methods import PaymentMethod
        from src.apps.payment_methods.models.payment_method_card_details import PaymentMethodCardDetails
        pm_stmt = select(PaymentMethod).where(
            PaymentMethod.id == payload.saved_payment_method_id,
            PaymentMethod.customer_id == customer_id,
            PaymentMethod.merchant_id == pr.merchant_id,
            PaymentMethod.deleted_at.is_(None),
        )
        pm = db.execute(pm_stmt).scalar_one_or_none()
        if not pm:
            raise APIException(
                message="Payment method not found or not authorized for this customer",
                status_code=403,
            )
        transaction.payment_method_id = pm.id
        # Fetch card last 4 for receipt display
        # PaymentMethodCardDetails is linked via PaymentMethod.card_details_id (not the other way)
        if pm.card_details_id:
            cd_stmt = select(PaymentMethodCardDetails).where(
                PaymentMethodCardDetails.id == pm.card_details_id
            )
            cd = db.execute(cd_stmt).scalar_one_or_none()
            if cd and cd.card_number:
                card_last4 = cd.card_number

    db.add(transaction)
    db.flush()

    # Find most recent HPP session for this PR
    from src.apps.hpp.models.hpp_session import HPPSession as HPPSessionModel
    stmt = (
        select(HPPSessionModel)
        .where(
            HPPSessionModel.payment_request_id == pr.id,
            HPPSessionModel.deleted_at.is_(None),
        )
        .order_by(HPPSessionModel.id.desc())
        .limit(1)
    )
    hpp_session = db.execute(stmt).scalar_one_or_none()
    if hpp_session:
        hpp_crud.stamp_hpp_session_submitted(db, hpp_session.id)

    # Determine authorization record type
    auth_record_type = AuthorizationRecordTypes.CHECKBOX
    if auth_type == "signature":
        auth_record_type = AuthorizationRecordTypes.SIGNATURE
    elif auth_type == "sms_otp":
        auth_record_type = AuthorizationRecordTypes.SMS

    ip_address = _get_client_ip(request)

    # Get verified phone if OTP — reuse active_otp fetched and validated in B-1 above
    verified_phone_last4 = None
    _sms_authorization_value = None
    if auth_type == "sms_otp":
        if active_otp and active_otp.is_verified:
            verified_phone_last4 = active_otp.phone_last4
            # PRD-009-HWRPT: Encrypt the verified phone_last4 as the durable OTP
            # authorization proof.  The plaintext OTP itself is not available at
            # submit time (it is bcrypt-hashed in hpp_otp_tokens.hashed_otp and
            # cannot be recovered).  Storing the encrypted phone_last4 provides
            # a tamper-evident record of which phone number was used for OTP
            # verification.  When a full plaintext-OTP capture is required, a
            # migration should add hpp_otp_tokens.encrypted_otp and this code
            # should read from that field instead.
            try:
                from src.core.utils.crypto import encrypt_otp
                _sms_authorization_value = encrypt_otp(active_otp.phone_last4)
            except Exception as _enc_exc:
                logger.warning(
                    "SMS OTP authorization_value encryption failed (key not configured?): %s",
                    _enc_exc,
                )

    # Mark any pre-existing PENDING authorizations for this PR as SUPERSEDED
    # (the HPP payment authorization replaces the merchant-initiated pending auth)
    supersede_stmt = select(PaymentRequestAuthorizations).where(
        PaymentRequestAuthorizations.payment_request_id == pr.id,
        PaymentRequestAuthorizations.status == AuthorizationStatus.PENDING.value,
        PaymentRequestAuthorizations.deleted_at.is_(None),
    )
    for existing_auth in db.execute(supersede_stmt).scalars().all():
        existing_auth.status = AuthorizationStatus.SUPERSEDED.value
        existing_auth.updated_at = datetime.now(timezone.utc)

    # Generate unique IDs for the HPP authorization record (mirrors non-HPP flow)
    hpp_auth_id = generate_secure_id(prepend="auth", length=20)
    hpp_auth_literal = f"AUTH-{generate_secure_id(length=8).upper()}"

    # Capture device/browser info and payer identity for the authorization audit trail
    from src.apps.payment_requests.utils.functions import get_device_details
    user_agent_string = request.headers.get("user-agent", "")
    device_details = get_device_details(user_agent_string)

    # Resolve the payer's display name from the contact record
    payer_display_name = "HPP Customer"
    if contact:
        parts = [
            getattr(contact, "first_name", None) or "",
            getattr(contact, "last_name", None) or "",
        ]
        full = " ".join(p for p in parts if p).strip()
        if full:
            payer_display_name = full
        elif getattr(contact, "email", None):
            payer_display_name = contact.email

    hpp_auth_metadata = {
        "ip_addr": ip_address,
        "user_agent": user_agent_string,
        "current_user": payer_display_name,
        "current_user_id": None,
        "timestamp": datetime.now(timezone.utc).timestamp(),
        "os": device_details.get("os"),
        "device": device_details.get("device"),
        "platform": device_details.get("platform"),
        "browser": device_details.get("browser"),
        "origin": "",
        "txn_source": TransactionSourceTypes.HPP,
    }

    auth_record = PaymentRequestAuthorizations(
        authorization_id=hpp_auth_id,
        authorization_literal=hpp_auth_literal,
        authorization_type=auth_record_type,
        authorization_date=datetime.now(timezone.utc),
        is_verified=True,
        status=AuthorizationStatus.ACTIVE.value,
        auth_metadata=hpp_auth_metadata,
        ip_address=ip_address,
        payment_request_id=pr.id,
        merchant_id=pr.merchant_id,
        customer_id=customer_id,
        payer_id=payer_id,
        hpp_session_id=hpp_session.id if hpp_session else None,
        verified_phone_last4=verified_phone_last4,
        authorization_value=_sms_authorization_value,
    )
    db.add(auth_record)
    db.flush()

    # Mark link used
    hpp_crud.mark_link_used(db, locked_link)

    # Update PR status
    pr.status = PaymentRequestStatusTypes.PAID.value
    pr.updated_at = datetime.now(timezone.utc)

    db.commit()
    db.refresh(transaction)

    # Write activity log entry for this HPP transaction
    try:
        from src.apps.reports.services.activity_log_service import write_activity
        from src.apps.settings.crud import get_or_create_preferences
        from src.apps.users.models.user import User
        _currency_code = (pr.currency or "USD").upper()
        # Amount stored in cents (PR created via VT frontend which sends Math.round(dollars * 100))
        _amount = float(transaction.txn_amount or 0) / 100
        # Try to resolve merchant user preferences for currency display
        _display = "code"
        try:
            _merchant_user_stmt = select(User).where(
                User.merchant_id == pr.merchant_id,
                User.deleted_at.is_(None),
            ).limit(1)
            _merchant_user = db.execute(_merchant_user_stmt).scalar_one_or_none()
            if _merchant_user:
                _prefs = get_or_create_preferences(_merchant_user.id, db)
                _display = _prefs.currency_display
        except Exception:
            pass
        _CURRENCY_SYMBOLS = {
            "USD": "$", "EUR": "€", "GBP": "£", "JPY": "¥",
            "CAD": "CA$", "AUD": "A$", "CHF": "Fr", "CNY": "¥", "MXN": "MX$",
        }
        _CURRENCY_NAMES = {
            "USD": "US Dollar", "EUR": "Euro", "GBP": "British Pound",
            "JPY": "Japanese Yen", "CAD": "Canadian Dollar",
            "AUD": "Australian Dollar", "CHF": "Swiss Franc",
            "CNY": "Chinese Yuan", "MXN": "Mexican Peso",
        }
        if _display == "symbol":
            _sym = _CURRENCY_SYMBOLS.get(_currency_code, _currency_code)
            _currency_label = f"{_sym}{_amount:,.2f}"
        elif _display == "name":
            _name = _CURRENCY_NAMES.get(_currency_code, _currency_code)
            _currency_label = f"{_name} {_amount:,.2f}"
        else:
            _currency_label = f"{_currency_code} {_amount:,.2f}"
        _customer_label = ""
        if _hpp_customer:
            _customer_label = (
                f" — {_hpp_customer.business_legal_name or _hpp_customer.first_name or ''}"
            ).rstrip(" —").rstrip()
            if _customer_label == " —":
                _customer_label = ""
        await write_activity(
            db=db,
            merchant_id=pr.merchant_id,
            event_type="transaction.paid",
            event_category="transactions",
            description=f"Payment of {_currency_label} processed{_customer_label}",
            actor_type="customer",
            reference_type="transaction",
            reference_id=str(transaction.txn_literal or transaction.txn_id or ""),
        )
        db.commit()
    except Exception as _act_err:
        logger.debug(f"write_activity (hpp transaction.paid) swallowed: {_act_err}")

    # Dispatch events
    try:
        event_type = "transaction.scheduled" if is_scheduled else "transaction.completed"
        await EventDispatcher.dispatch(
            BaseEvent(
                event_type=event_type,
                data={
                    "transaction_id": transaction.id,
                    "txn_id": transaction.txn_id,
                    "txn_amount": transaction.txn_amount,
                    "payment_request_id": pr.id,
                    "merchant_id": pr.merchant_id,
                    "customer_id": customer_id,
                },
            )
        )
        await EventDispatcher.dispatch(
            BaseEvent(
                event_type="payment_request.authorized",
                data={
                    "payment_request_id": pr.id,
                    "merchant_id": pr.merchant_id,
                    "authorization_id": auth_record.id,
                    "transaction_id": transaction.id,
                },
            )
        )
    except Exception as exc:
        logger.error("Event dispatch failed after HPP submit: %s", exc)

    status_text = "paid" if txn_status == TransactionStatusTypes.PAID else "processing"
    return HppSubmitResponseSchema(
        transaction_id=transaction.id,
        txn_id=transaction.txn_id,
        txn_literal=transaction.txn_literal,
        txn_amount=transaction.txn_amount,
        txn_status=int(transaction.txn_status),
        status_text=status_text,
        payment_request_id=pr.id,
        is_scheduled=is_scheduled,
        card_last4=card_last4,
        message="Payment submitted successfully",
    )


# ─── Retry Initiate ───────────────────────────────────────────────────────────

async def initiate_retry(
    db: Session,
    token: str,
    request: Request,
) -> HppRetryInitiateResponseSchema:
    """
    Validate a retry token and return data needed to display the retry page.
    """
    retry_token = hpp_crud.get_retry_token(db, token)
    if not retry_token:
        raise NotFoundError(message="Retry link not found")

    now = datetime.now(timezone.utc)
    expires_at = retry_token.expires_at
    if expires_at.tzinfo is None:
        expires_at = expires_at.replace(tzinfo=timezone.utc)

    if expires_at < now:
        raise APIException(message="This retry link has expired", status_code=410)
    if retry_token.used_at is not None:
        raise ConflictError(message="This retry link has already been used")

    pr = _get_payment_request(db, retry_token.payment_request_id)

    # Load failed transaction
    stmt = select(Transactions).where(Transactions.id == retry_token.transaction_id)
    transaction = db.execute(stmt).scalar_one_or_none()
    if not transaction:
        raise NotFoundError(message="Transaction not found")

    customer_id = None
    for pr_customer in getattr(pr, "payment_request_customers", []):
        cust = getattr(pr_customer, "customer", None)
        if cust:
            customer_id = cust.id
            break

    saved_pms = _build_saved_payment_methods(
        db, customer_id, pr.merchant_id
    )

    return HppRetryInitiateResponseSchema(
        retry_token_id=retry_token.id,
        payment_request_id=pr.id,
        transaction_id=retry_token.transaction_id,
        amount=float(transaction.txn_amount or 0),
        currency=transaction.currency or "USD",
        merchant_name=getattr(getattr(pr, "merchant", None), "name", ""),
        branding=_build_branding(pr, db),
        saved_payment_methods=saved_pms,
    )


# ─── Retry Submit ─────────────────────────────────────────────────────────────

async def submit_retry(
    db: Session,
    payload: Any,
    request: Request,
) -> HppRetrySubmitResponseSchema:
    """
    Process a retry payment for a failed installment.

    C-10: Creates a NEW Transaction record for the retry instead of overwriting
    the original transaction's txn_id, preserving the full audit trail.
    The original FAILED transaction is updated to reference the new retry transaction
    via its metadata field.
    """
    from src.apps.base.utils.functions import generate_secure_id
    from src.apps.transactions.services import generate_txn_literal

    retry_token = hpp_crud.get_retry_token(db, payload.token)
    if not retry_token:
        raise NotFoundError(message="Retry link not found")

    now = datetime.now(timezone.utc)
    expires_at = retry_token.expires_at
    if expires_at.tzinfo is None:
        expires_at = expires_at.replace(tzinfo=timezone.utc)
    if expires_at < now:
        raise APIException(message="This retry link has expired", status_code=410)
    if retry_token.used_at is not None:
        raise ConflictError(message="This retry link has already been used")

    # Fetch the original failed transaction
    stmt = select(Transactions).where(Transactions.id == retry_token.transaction_id)
    original_txn = db.execute(stmt).scalar_one_or_none()
    if not original_txn:
        raise NotFoundError(message="Original transaction not found")

    # SEC-003: verify saved payment method ownership before use in retry
    retry_payment_method_id = original_txn.payment_method_id
    if payload.saved_payment_method_id:
        from src.apps.payment_methods.models.payment_methods import PaymentMethod
        retry_pm_stmt = select(PaymentMethod).where(
            PaymentMethod.id == payload.saved_payment_method_id,
            PaymentMethod.customer_id == original_txn.customer_id,
            PaymentMethod.merchant_id == original_txn.merchant_id,
            PaymentMethod.deleted_at.is_(None),
        )
        retry_pm = db.execute(retry_pm_stmt).scalar_one_or_none()
        if not retry_pm:
            raise APIException(
                message="Payment method not found or not authorized for this customer",
                status_code=403,
            )
        retry_payment_method_id = retry_pm.id

    # C-10: Create a NEW transaction for the retry instead of overwriting original.
    new_mock_txn_id = f"mock_{uuid.uuid4().hex[:12]}"
    new_txn_literal = generate_txn_literal(db)

    new_txn = Transactions(
        txn_amount=original_txn.txn_amount,
        txn_type=original_txn.txn_type,
        txn_status=TransactionStatusTypes.PAID,
        txn_id=new_mock_txn_id,
        txn_literal=new_txn_literal,
        currency=original_txn.currency,
        description=f"Retry for transaction {original_txn.txn_literal or original_txn.id}",
        category=original_txn.category,
        transaction_type=original_txn.transaction_type,
        txn_source=original_txn.txn_source,
        payment_request_id=original_txn.payment_request_id,
        merchant_id=original_txn.merchant_id,
        customer_id=original_txn.customer_id,
        payment_method_id=retry_payment_method_id,
        occurred_at=datetime.now(timezone.utc),
    )
    db.add(new_txn)
    db.flush()

    # Update original FAILED transaction: mark it FAILED with a reference to the retry
    original_txn.txn_status = TransactionStatusTypes.FAILED
    existing_metadata = original_txn.txn_metadata or {}
    if not isinstance(existing_metadata, dict):
        existing_metadata = {}
    existing_metadata["retry_transaction_id"] = new_txn.id
    original_txn.txn_metadata = existing_metadata

    # Mark retry token used
    hpp_crud.mark_retry_token_used(db, retry_token)

    db.commit()
    db.refresh(new_txn)

    # Dispatch event
    try:
        await EventDispatcher.dispatch(
            BaseEvent(
                event_type="transaction.completed",
                data={
                    "transaction_id": new_txn.id,
                    "txn_id": new_txn.txn_id,
                    "txn_amount": new_txn.txn_amount,
                    "payment_request_id": retry_token.payment_request_id,
                    "merchant_id": new_txn.merchant_id,
                    "is_retry": True,
                    "original_transaction_id": original_txn.id,
                },
            )
        )
    except Exception as exc:
        logger.error("Event dispatch failed after retry submit: %s", exc)

    return HppRetrySubmitResponseSchema(
        transaction_id=new_txn.id,
        txn_id=new_txn.txn_id,
        txn_amount=float(new_txn.txn_amount or 0),
        status_text="paid",
        message="Retry payment processed successfully",
    )


# ─── OTP ──────────────────────────────────────────────────────────────────────

async def send_otp(db: Session, token: str) -> OtpSendResponseSchema:
    """
    Generate a 6-digit OTP, hash it with bcrypt, store in hpp_otp_tokens,
    and dispatch via Twilio (stub — log only if creds not configured).

    Enforces: max 3 sends per payment request.
    """
    link = _get_valid_link(db, token)
    pr = _get_payment_request(db, link.payment_request_id)

    # Enforce send limit
    send_count = hpp_crud.get_send_count_for_payment_request(db, pr.id)
    if send_count >= OTP_MAX_SENDS:
        raise APIException(
            message=f"Maximum OTP sends ({OTP_MAX_SENDS}) reached for this payment request",
            status_code=429,
        )

    # SEC-005: enforce 60s cooldown between OTP sends
    latest_otp = hpp_crud.get_active_otp(db, pr.id)
    if latest_otp:
        created = latest_otp.created_at
        if created.tzinfo is None:
            created = created.replace(tzinfo=timezone.utc)
        seconds_since = (datetime.now(timezone.utc) - created).total_seconds()
        if seconds_since < 60:
            raise APIException(
                message=f"Please wait {int(60 - seconds_since)} seconds before requesting a new OTP",
                status_code=429,
            )

    # Resolve customer phone
    phone = None
    for pr_customer in getattr(pr, "payment_request_customers", []):
        cust = getattr(pr_customer, "customer", None)
        if cust:
            phone = getattr(cust, "phone", None) or getattr(cust, "phone_number", None)
            break

    if not phone or len(phone) < 4:
        raise APIException(
            message="No valid phone number on record for this customer",
            status_code=400,
        )

    phone_last4 = phone[-4:]

    # Generate and hash OTP
    otp = generate_otp()
    hashed = hash_otp(otp)
    expires_at = datetime.now(timezone.utc) + timedelta(minutes=OTP_EXPIRY_MINUTES)

    otp_token = hpp_crud.create_otp_token(
        db=db,
        payment_request_id=pr.id,
        hashed_otp=hashed,
        phone_last4=phone_last4,
        expires_at=expires_at,
    )
    db.commit()

    # Send SMS (stub: log if Twilio not configured)
    await _send_sms(phone, f"Your HubWallet verification code is: {otp}")

    return OtpSendResponseSchema(
        phone_last4=phone_last4,
        send_count=send_count + 1,
        expires_in_seconds=OTP_EXPIRY_MINUTES * 60,
        message="OTP sent successfully",
    )


async def verify_otp_code(
    db: Session, token: str, otp: str
) -> OtpVerifyResponseSchema:
    """
    Verify an OTP entered by the customer.

    Enforces: max 5 wrong attempts before lockout.
    """
    link = _get_valid_link(db, token)
    pr = _get_payment_request(db, link.payment_request_id)

    active_otp = hpp_crud.get_active_otp(db, pr.id)
    if not active_otp:
        raise APIException(
            message="No active OTP found. Please request a new code.",
            status_code=404,
        )

    # Check attempt limit
    if (active_otp.attempt_count or 0) >= OTP_MAX_ATTEMPTS:
        raise APIException(
            message="Too many incorrect attempts. Please request a new OTP.",
            status_code=429,
        )

    if not verify_otp(otp, active_otp.hashed_otp):
        hpp_crud.increment_otp_attempt(db, active_otp)
        db.commit()
        raise APIException(
            message="Incorrect OTP code",
            status_code=400,
        )

    # Mark verified
    hpp_crud.mark_otp_verified(db, active_otp)
    db.commit()

    return OtpVerifyResponseSchema(
        verified=True,
        attempt_count=int(active_otp.attempt_count or 0),
        message="OTP verified successfully",
    )


# ─── Customer account ─────────────────────────────────────────────────────────

async def register_customer(
    db: Session, token: str, email: str, password: str
) -> CustomerRegisterResponseSchema:
    """
    Create a new customer portal user account scoped to the merchant from the token.

    Dispatches: customer.account_created
    """
    link = _get_valid_link(db, token)
    pr = _get_payment_request(db, link.payment_request_id)

    # Check for existing customer user with this email + merchant_id
    stmt = select(User).where(
        User.email == email,
        User.merchant_id == pr.merchant_id,
        User.user_type == "customer",
        User.deleted_at.is_(None),
    )
    existing = db.execute(stmt).scalar_one_or_none()
    if existing:
        raise ConflictError(
            message="An account with this email already exists for this merchant"
        )

    hashed_password = bcrypt.hashpw(
        password.encode("utf-8"), bcrypt.gensalt()
    ).decode("utf-8")

    user = User(
        email=email,
        username=email,
        hashed_password=hashed_password,
        user_type="customer",
        merchant_id=pr.merchant_id,
        is_active=True,
        is_verified=False,
        is_superuser=False,
    )
    db.add(user)
    db.flush()

    db.commit()
    db.refresh(user)

    # Dispatch event
    try:
        await EventDispatcher.dispatch(
            BaseEvent(
                event_type="customer.account_created",
                data={
                    "user_id": user.id,
                    "email": user.email,
                    "merchant_id": pr.merchant_id,
                },
            )
        )
    except Exception as exc:
        logger.error("Event dispatch failed after customer register: %s", exc)

    return CustomerRegisterResponseSchema(
        user_id=user.id,
        email=user.email,
        is_verified=False,
        message="Account created. Please verify your email.",
    )


async def login_customer(
    db: Session,
    token: str,
    email: str,
    password: str,
    request: Optional[Request] = None,
) -> CustomerLoginResponseSchema:
    """
    Authenticate a customer portal user and return a short-lived HPP session token.

    C-6: Creates an AuthSession record to enable session revocation and audit
    logging, consistent with the merchant/admin auth flow.
    """
    import jwt as pyjwt
    from src.apps.auth.models.auth_session import AuthSession

    link = _get_valid_link(db, token)
    pr = _get_payment_request(db, link.payment_request_id)

    stmt = select(User).where(
        User.email == email,
        User.merchant_id == pr.merchant_id,
        User.user_type == "customer",
        User.deleted_at.is_(None),
    )
    user = db.execute(stmt).scalar_one_or_none()
    if not user:
        # SEC-007: constant-time dummy check to prevent user enumeration via timing
        bcrypt.checkpw(password.encode("utf-8"), DUMMY_HASH.encode("utf-8"))
        raise UnauthorizedError(message="Invalid email or password")

    if not bcrypt.checkpw(password.encode("utf-8"), user.hashed_password.encode("utf-8")):
        raise UnauthorizedError(message="Invalid email or password")

    if not user.is_active:
        raise UnauthorizedError(message="Account is inactive")

    expires_in = 1800  # 30 minutes
    expires_at = datetime.now(timezone.utc) + timedelta(seconds=expires_in)
    jwt_payload = {
        "sub": str(user.id),
        "email": user.email,
        "user_type": "customer",
        "merchant_id": pr.merchant_id,
        "exp": expires_at,
        "hpp_session": True,
    }
    access_token = pyjwt.encode(
        jwt_payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM
    )

    # C-6: Track session in auth_sessions for revocation and audit consistency.
    ip_address = None
    user_agent_str = None
    if request is not None:
        ip_address = _get_client_ip(request)
        user_agent_str = request.headers.get("user-agent")

    try:
        auth_session = AuthSession(
            user_id=user.id,
            access_token=access_token,
            refresh_token=None,  # HPP sessions are non-renewable
            device_info=None,
            ip_address=ip_address,
            user_agent=user_agent_str,
            access_token_expires_at=expires_at,
            refresh_token_expires_at=None,
            is_active=True,
            is_revoked=False,
        )
        db.add(auth_session)
        db.commit()
    except Exception as exc:
        # Log but do not fail login if session tracking fails
        logger.error("Failed to create AuthSession for HPP customer login: %s", exc)
        db.rollback()

    return CustomerLoginResponseSchema(
        access_token=access_token,
        token_type="bearer",
        expires_in=expires_in,
        message="Login successful",
    )


async def verify_customer_email(db: Session, verify_token: str) -> Dict[str, Any]:
    """
    Verify a customer's email using a time-limited signed token.

    For now this is a simplified implementation: decode the JWT, find the user,
    set is_verified = True.
    """
    import jwt as pyjwt

    try:
        payload = pyjwt.decode(
            verify_token,
            settings.JWT_SECRET_KEY,
            algorithms=[settings.JWT_ALGORITHM],
        )
    except Exception:
        raise APIException(message="Invalid or expired verification link", status_code=400)

    user_id = payload.get("sub")
    if not user_id:
        raise APIException(message="Invalid verification token", status_code=400)

    stmt = select(User).where(
        User.id == int(user_id),
        User.deleted_at.is_(None),
    )
    user = db.execute(stmt).scalar_one_or_none()
    if not user:
        raise NotFoundError(message="User not found")

    user.is_verified = True
    db.commit()

    return {"verified": True, "message": "Email verified successfully"}


# ─── Merchant link management ─────────────────────────────────────────────────

async def list_payment_links(
    db: Session, payment_request_id: int, merchant_id: int
) -> PaymentLinksListResponseSchema:
    """
    Return all payment links for a payment request, masked for display.

    Enforces merchant scoping.
    """
    pr = _get_payment_request(db, payment_request_id)
    if pr.merchant_id != merchant_id:
        raise NotFoundError(message="Payment request not found")

    links = hpp_crud.get_links_for_payment_request(db, payment_request_id)
    items = []
    for link in links:
        items.append(
            PaymentLinkSchema(
                id=link.id,
                masked_token=mask_token(link.token or ""),
                status=link.status or "PENDING",
                click_count=link.click_count or 0,
                created_at=None,
                used_at=link.used_at,
                revoked_at=link.revoked_at,
                last_clicked_at=link.last_clicked_at,
                end_date=link.end_date,
            )
        )
    return PaymentLinksListResponseSchema(links=items, total=len(items))


async def resend_payment_link(
    db: Session, payment_request_id: int, merchant_id: int
) -> ResendLinkResponseSchema:
    """
    Resend notification for the active link, or generate a new token if expired.

    C-4: Before creating a new link, all existing PENDING links are revoked so
    that only one active link can exist at a time.
    """
    pr = _get_payment_request(db, payment_request_id)
    if pr.merchant_id != merchant_id:
        raise NotFoundError(message="Payment request not found")

    links = hpp_crud.get_links_for_payment_request(db, payment_request_id)

    # Find the active PENDING link
    active_link = next(
        (lnk for lnk in links if lnk.status == "PENDING"), None
    )

    if not active_link:
        # All links are expired/used/revoked.
        # C-4: Revoke all remaining PENDING links before creating a new one.
        hpp_crud.revoke_all_pending_links(db, payment_request_id)

        # Create a new link token.
        new_token = secrets.token_urlsafe(32)
        now = datetime.now(timezone.utc)
        new_link = PaymentRequestLinks(
            token=new_token,
            status="PENDING",
            payment_request_id=payment_request_id,
            invoice_id=links[0].invoice_id if links else None,
            start_date=now,
            end_date=now + timedelta(hours=settings.HPP_LINK_EXPIRY_HOURS),
            click_count=0,
            is_expired=False,
        )
        db.add(new_link)
        db.flush()
        active_link = new_link
    else:
        # A PENDING link already exists — revoke any *other* PENDING duplicates
        # (edge case: data anomaly) before re-sending this one.
        for lnk in links:
            if lnk.status == "PENDING" and lnk.id != active_link.id:
                lnk.status = "REVOKED"
                lnk.revoked_at = datetime.now(timezone.utc)
                lnk.is_expired = True
        db.flush()

    db.commit()

    # Dispatch event to trigger notification
    try:
        await EventDispatcher.dispatch(
            BaseEvent(
                event_type="payment_link.created",
                data={
                    "payment_request_id": payment_request_id,
                    "merchant_id": merchant_id,
                    "link_id": active_link.id,
                    # SEC-009: raw token removed from event payload — events carry only
                    # link_id (DB PK). The token value is a secret credential.
                    "is_resend": True,
                },
            )
        )
    except Exception as exc:
        logger.error("Event dispatch failed after resend: %s", exc)

    return ResendLinkResponseSchema(
        link_id=active_link.id,
        masked_token=mask_token(active_link.token or ""),
        token=active_link.token or "",
        message="Notification re-sent successfully",
    )


async def revoke_payment_link(
    db: Session, payment_request_id: int, link_id: int, merchant_id: int
) -> RevokeLinkResponseSchema:
    """
    Revoke a specific payment link (status = REVOKED).
    """
    pr = _get_payment_request(db, payment_request_id)
    if pr.merchant_id != merchant_id:
        raise NotFoundError(message="Payment request not found")

    link = hpp_crud.get_link_by_id(db, link_id)
    if not link or link.payment_request_id != payment_request_id:
        raise NotFoundError(message="Payment link not found")

    if link.status in ("USED", "REVOKED"):
        raise ConflictError(message=f"Link cannot be revoked (current status: {link.status})")

    hpp_crud.mark_link_revoked(db, link)
    db.commit()

    return RevokeLinkResponseSchema(
        link_id=link_id,
        status="REVOKED",
        message="Link revoked successfully",
    )


# ─── Merchant-triggered transaction retry ─────────────────────────────────────

async def create_transaction_retry(
    db: Session, transaction_id: int, merchant_id: int
) -> TransactionRetryResponseSchema:
    """
    Create a retry token for a failed transaction and dispatch a notification.
    """
    stmt = select(Transactions).where(
        Transactions.id == transaction_id,
        Transactions.merchant_id == merchant_id,
    )
    txn = db.execute(stmt).scalar_one_or_none()
    if not txn:
        raise NotFoundError(message="Transaction not found")

    if txn.txn_status != TransactionStatusTypes.FAILED:
        raise APIException(
            message="Only FAILED transactions can be retried",
            status_code=400,
        )

    # Enforce max 3 active retry tokens per transaction
    active_tokens = hpp_crud.get_active_retry_tokens_for_transaction(db, transaction_id)
    if len(active_tokens) >= 3:
        raise APIException(
            message="Maximum retry attempts (3) reached for this transaction",
            status_code=429,
        )

    token = generate_retry_token()
    expires_at = datetime.now(timezone.utc) + timedelta(hours=RETRY_TOKEN_EXPIRY_HOURS)

    retry_token = hpp_crud.create_retry_token(
        db=db,
        transaction_id=transaction_id,
        payment_request_id=txn.payment_request_id,
        token=token,
        expires_at=expires_at,
    )
    db.commit()

    retry_link = f"{settings.hpp_frontend_base_url}/hpp/retry?token={token}"

    # Dispatch notification event
    try:
        await EventDispatcher.dispatch(
            BaseEvent(
                event_type="transaction.failed",
                data={
                    "transaction_id": transaction_id,
                    "payment_request_id": txn.payment_request_id,
                    "merchant_id": merchant_id,
                    # SEC-009: raw retry token removed from event payload — events carry only
                    # retry_token_id (DB PK). The token value is a secret credential.
                    # The notification listener reconstructs the retry URL from the DB record.
                    "retry_token_id": retry_token.id,
                    "is_merchant_triggered": True,
                },
            )
        )
    except Exception as exc:
        logger.error("Event dispatch failed after transaction retry create: %s", exc)

    return TransactionRetryResponseSchema(
        retry_token_id=retry_token.id,
        token=token,
        expires_at=expires_at,
        message="Retry link created and notification sent",
    )


# ─── Saved payment methods (customers router) ─────────────────────────────────

async def get_customer_payment_methods(
    db: Session, customer_id: int, merchant_id: int
) -> List[SavedPaymentMethodSchema]:
    """
    Return masked saved payment methods for a customer, scoped to a merchant.
    """
    return _build_saved_payment_methods(db, customer_id, merchant_id)


# ─── Notification stub ────────────────────────────────────────────────────────

async def _send_sms(phone: str, message: str) -> None:
    """
    Send SMS via Twilio, or log if credentials are not configured.
    """
    if not settings.TWILIO_ACCOUNT_SID or not settings.TWILIO_AUTH_TOKEN:
        logger.info(
            "[SMS STUB] Would send to %s: %s",
            phone[-4:].rjust(len(phone), "*"),
            message,
        )
        return

    try:
        from twilio.rest import Client  # type: ignore

        client = Client(settings.TWILIO_ACCOUNT_SID, settings.TWILIO_AUTH_TOKEN)
        client.messages.create(
            body=message,
            from_=settings.TWILIO_FROM_NUMBER,
            to=phone,
        )
        logger.info("SMS sent to %s", phone[-4:].rjust(len(phone), "*"))
    except Exception as exc:
        logger.error("Twilio SMS failed: %s", exc)
