"""
Subscription business logic services.

All functions accept a SQLAlchemy Session but do NOT commit — callers
(routers or Celery tasks) own the transaction boundary.
"""

from __future__ import annotations

import logging
import math
from datetime import datetime, timezone
from typing import List, Optional

from sqlalchemy import update
from sqlalchemy.orm import Session

from src.apps.subscriptions.models.subscription import Subscription
from src.apps.subscriptions.enums import SubscriptionStatus, SubscriptionActivityTypes
from src.apps.subscriptions import crud as sub_crud
from src.apps.subscriptions.helpers.billing_date import (
    compute_next_billing_date,
    compute_projected_dates,
    get_interval_label,
)
from src.apps.subscriptions.helpers.mrr import normalize_to_mrr
from src.apps.subscriptions.schemas.subscription_schemas import (
    SubscriptionInvoiceItem,
    SubscriptionInvoiceListResponse,
    SubscriptionSummaryResponse,
    EditSubscriptionInvoiceRequest,
    EditSubscriptionInvoiceResponse,
    status_to_text,
)
from src.core.exceptions import NotFoundError, APIException

logger = logging.getLogger(__name__)

# Maximum dunning retries before exhausting
MAX_DUNNING_RETRIES = 3
# Days between dunning retries — retry 1 at T+3, retry 2 at T+7 (PRD HWSUB-114)
DUNNING_RETRY_DAYS = [3, 7]


# ─── Lookup helpers ──────────────────────────────────────────────────────────


def get_subscription_or_404(
    db: Session,
    sub_id: str,
    merchant_id: int,
) -> Subscription:
    """Fetch subscription by opaque ID or literal, raise 404 if missing."""
    subscription = sub_crud.get_subscription_by_id_or_literal(db, sub_id, merchant_id)
    if not subscription:
        raise NotFoundError(message=f"Subscription '{sub_id}' not found")
    return subscription


# ─── Creation ────────────────────────────────────────────────────────────────


async def create_subscription_from_payment_request(
    db: Session,
    payment_request,
    recurring_config,
    subscription_name: Optional[str] = None,
) -> Subscription:
    """
    Create a Subscription record when a recurring PaymentRequest is saved.

    Called from payment_requests/services.py → save_recurring_config().
    The caller owns the commit.

    Args:
        db: Active SQLAlchemy session.
        payment_request: PaymentRequest ORM instance (already flushed, has .id).
        recurring_config: RecurringPaymentRequests ORM instance (already flushed).
        subscription_name: Optional display name for the subscription.

    Returns:
        The newly created Subscription ORM instance (flushed, not committed).
    """
    from sqlalchemy.exc import IntegrityError

    for attempt in range(5):
        try:
            savepoint = db.begin_nested()
            sub_literal = sub_crud.generate_subscription_literal(db)
            sub_id = sub_crud.generate_subscription_id()

            # Determine start / next_billing_date
            start_date = getattr(recurring_config, "start_date", None)
            prorate_first = getattr(recurring_config, "prorate_first_payment", False)
            prorate_date = getattr(recurring_config, "prorate_date", None)

            if prorate_first and prorate_date:
                first_billing = prorate_date
            elif start_date:
                first_billing = start_date
            else:
                first_billing = datetime.now(timezone.utc)

            # Ensure timezone-aware
            if first_billing and first_billing.tzinfo is None:
                first_billing = first_billing.replace(tzinfo=timezone.utc)

            # Determine initial status based on authorization type
            auth_type = getattr(payment_request, "authorization_type", None)
            initial_status = (
                SubscriptionStatus.ACTIVE
                if auth_type == "pre_auth"
                else SubscriptionStatus.INITIALIZING
            )

            # Resolve customer_id from payment_request_customers (M2M relationship).
            # PaymentRequest has no direct customer_id column.
            resolved_customer_id: Optional[int] = None
            for prc in getattr(payment_request, "payment_request_customers", []):
                cust = getattr(prc, "customer", None)
                if cust:
                    resolved_customer_id = cust.id
                    break

            subscription = Subscription(
                subscription_id=sub_id,
                subscription_literal=sub_literal,
                name=subscription_name,
                status=initial_status,
                merchant_id=payment_request.merchant_id,
                customer_id=resolved_customer_id,
                payment_request_id=payment_request.id,
                next_billing_date=first_billing,
                total_billed=0.0,
                total_paid=0.0,
                invoices_generated=0,
                invoices_paid=0,
                dunning_retry_count=0,
            )
            db.add(subscription)
            db.flush()

            # Write creation activity
            sub_crud.write_activity(
                db=db,
                subscription_id=subscription.id,
                activity_type=SubscriptionActivityTypes.CREATED,
                description=f"Subscription {sub_literal} created from payment request",
                actor_type="system",
                actor_id=None,
                metadata={
                    "payment_request_id": payment_request.id,
                    "interval": getattr(recurring_config, "interval", None),
                    "interval_value": getattr(recurring_config, "interval_value", 1),
                },
            )

            savepoint.commit()
            db.commit()
            logger.info("Created subscription %s for payment_request_id=%d", sub_literal, payment_request.id)

            # Pre-auth: if first billing date is today or in the past, create
            # invoice #1 so the charge block in payment_requests/services.py can
            # find it and immediately mark it PAID after the provider charge.
            # We deliberately do NOT call _generate_invoice_for_subscription here
            # because that function also creates a PENDING transaction and
            # schedules a Celery task — both of which would duplicate the
            # immediate charge the caller is about to perform.
            if auth_type == "pre_auth" and first_billing:
                billing_tz = first_billing.replace(tzinfo=timezone.utc) if first_billing.tzinfo is None else first_billing
                if billing_tz.date() <= datetime.now(timezone.utc).date():
                    try:
                        from src.apps.invoices.services.invoice_services import create_invoice
                        from src.apps.subscriptions.helpers.billing_date import compute_next_billing_date
                        from src.core.utils.enums import InvoiceStatusTypes
                        _now = datetime.now(timezone.utc)
                        first_inv = create_invoice(
                            db=db,
                            payment_request=payment_request,
                            merchant_id=subscription.merchant_id,
                            customer_id=subscription.customer_id,
                            amount=float(payment_request.amount),
                            status=InvoiceStatusTypes.PENDING,
                            due_date=billing_tz,
                            billing_date=_now,
                            sequence_id=1,
                        )
                        first_inv.subscription_id = subscription.id
                        # Advance next_billing_date and update counters so the
                        # Celery scheduler knows when cycle 2 should fire.
                        _interval = getattr(recurring_config, "interval", "month")
                        _interval_val = getattr(recurring_config, "interval_value", 1)
                        subscription.invoices_generated = 1
                        subscription.total_billed = float(payment_request.amount)
                        subscription.next_billing_date = compute_next_billing_date(
                            _interval, _interval_val, billing_tz
                        )
                        sub_crud.write_activity(
                            db=db,
                            subscription_id=subscription.id,
                            activity_type=SubscriptionActivityTypes.INVOICE_GENERATED,
                            description=f"Invoice {first_inv.invoice_literal} generated for cycle 1",
                            actor_type="system",
                            actor_id=None,
                            metadata={
                                "invoice_literal": first_inv.invoice_literal,
                                "sequence": 1,
                                "amount": float(payment_request.amount),
                            },
                        )
                        db.flush()
                        logger.info(
                            "Pre-auth: created invoice %s (seq 1) for subscription %s",
                            first_inv.invoice_literal, sub_literal,
                        )
                    except Exception as exc:
                        logger.warning(
                            "Pre-auth invoice pre-creation failed for subscription %s: %s",
                            sub_literal, exc, exc_info=True,
                        )

            return subscription

        except IntegrityError:
            savepoint.rollback()
            if attempt == 4:
                raise
        except Exception:
            # Non-integrity errors (e.g. constraint violations, lazy-load
            # failures, missing columns) should also roll back the savepoint
            # so the outer transaction remains usable.
            savepoint.rollback()
            raise

    raise RuntimeError("Failed to generate unique subscription_literal after 5 attempts")


# ─── Invoice projections ─────────────────────────────────────────────────────


def get_subscription_invoices_with_projections(
    db: Session,
    subscription: Subscription,
    max_items: int = 12,
) -> SubscriptionInvoiceListResponse:
    """
    Return real (generated) invoices plus virtual projected future cycles.

    Real invoices come from the DB; virtual ones are computed from
    the recurring_config attached to the payment_request.

    The combined list is capped at max_items entries.
    """
    from src.core.utils.enums import InvoiceStatusTypes

    real_invoices = sub_crud.get_subscription_invoices(db, subscription.id)

    def _invoice_status_text(inv) -> str:
        status_map = {
            InvoiceStatusTypes.PAID: "PAID",
            InvoiceStatusTypes.PARTIALLY_PAID: "PAID",
            InvoiceStatusTypes.OVERDUE: "OVERDUE",
            InvoiceStatusTypes.CANCELLED: "CANCELLED",
            InvoiceStatusTypes.PENDING: "PENDING",
            InvoiceStatusTypes.WAITING: "PENDING",
            InvoiceStatusTypes.CREATED: "PENDING",
            InvoiceStatusTypes.UPDATED: "PENDING",
            InvoiceStatusTypes.FAILED: "FAILED",
            InvoiceStatusTypes.CAPTURED: "PROCESSING",
        }
        return status_map.get(inv.status, "PENDING")

    items: List[SubscriptionInvoiceItem] = []
    for inv in real_invoices:
        status_text = _invoice_status_text(inv)
        items.append(
            SubscriptionInvoiceItem(
                sequence=inv.sequence_id or 0,
                amount=inv.amount,
                due_date=inv.due_date,
                payment_date=inv.paid_date,
                status=status_text,
                is_generated=True,
                invoice_id=inv.invoice_id,
                invoice_literal=inv.invoice_literal,
                can_edit=status_text in ("PENDING", "OVERDUE"),
                can_charge_now=status_text in ("PENDING", "OVERDUE") and subscription.status in (
                    SubscriptionStatus.ACTIVE, SubscriptionStatus.PAST_DUE
                ),
                can_retry=status_text == "FAILED",
            )
        )

    total_generated = len(items)
    remaining_slots = max_items - total_generated

    # Build virtual future projections if subscription is not terminated
    virtual_items: List[SubscriptionInvoiceItem] = []
    terminal_statuses = {SubscriptionStatus.CANCELLED, SubscriptionStatus.COMPLETED, SubscriptionStatus.DUNNING_EXHAUSTED}

    if remaining_slots > 0 and subscription.status not in terminal_statuses:
        rec = _get_recurring_config(subscription)
        if rec and subscription.next_billing_date:
            future_dates = compute_projected_dates(
                interval=getattr(rec, "interval", "month"),
                interval_value=getattr(rec, "interval_value", 1),
                from_date=subscription.next_billing_date,
                count=remaining_slots,
                end_type=getattr(rec, "end_type", None),
                end_date=getattr(rec, "end_date", None),
                pay_until_count=getattr(rec, "pay_until_count", None),
                already_generated=subscription.invoices_generated,
            )
            next_seq = subscription.invoices_generated + 1
            for i, date in enumerate(future_dates):
                # Ensure aware
                if date.tzinfo is None:
                    date = date.replace(tzinfo=timezone.utc)
                amount = getattr(subscription, "_projected_amount", None)
                if amount is None:
                    pr = subscription.payment_request
                    amount = pr.amount if pr else 0.0
                virtual_items.append(
                    SubscriptionInvoiceItem(
                        sequence=next_seq + i,
                        amount=amount,
                        due_date=date,
                        payment_date=None,
                        status="FUTURE",
                        is_generated=False,
                        invoice_id=None,
                        invoice_literal=None,
                        can_edit=True,
                        can_charge_now=False,
                        can_retry=False,
                    )
                )

    all_items = sorted(items + virtual_items, key=lambda x: x.sequence)

    # Determine has_more: probe one extra cycle beyond the last virtual item
    has_more = False
    terminal_statuses_for_more = {
        SubscriptionStatus.CANCELLED,
        SubscriptionStatus.COMPLETED,
        SubscriptionStatus.EXPIRED if hasattr(SubscriptionStatus, "EXPIRED") else None,
    } - {None}
    if subscription.status not in (SubscriptionStatus.CANCELLED, SubscriptionStatus.COMPLETED, SubscriptionStatus.DUNNING_EXHAUSTED):
        rec_for_probe = _get_recurring_config(subscription)
        if rec_for_probe and virtual_items:
            last_virtual_date = virtual_items[-1].due_date
            if last_virtual_date:
                probe_interval = getattr(rec_for_probe, "interval", "month")
                probe_interval_value = getattr(rec_for_probe, "interval_value", 1)
                probe_date = compute_next_billing_date(probe_interval, probe_interval_value, last_virtual_date)
                if probe_date is not None:
                    end_type = getattr(rec_for_probe, "end_type", None)
                    end_date = getattr(rec_for_probe, "end_date", None)
                    pay_until_count = getattr(rec_for_probe, "pay_until_count", None)
                    total_so_far = len(generated_invoices if False else items) + len(virtual_items) + 1
                    if end_type == "date" and end_date:
                        if end_date.tzinfo is None:
                            end_date = end_date.replace(tzinfo=timezone.utc)
                        has_more = probe_date <= end_date
                    elif end_type == "until_count" and pay_until_count:
                        has_more = (subscription.invoices_generated + len(virtual_items) + 1) < pay_until_count
                    else:
                        # until_cancelled — always more unless terminated
                        has_more = True

    return SubscriptionInvoiceListResponse(
        items=all_items[:max_items],
        total_generated=total_generated,
        total_virtual=len(virtual_items),
        has_more=has_more,
    )


def _get_recurring_config(subscription: Subscription):
    """Safely retrieve the recurring_config from the payment_request relationship."""
    try:
        pr = subscription.payment_request
        if pr:
            rec = getattr(pr, "recurring_config", None)
            if isinstance(rec, list):
                return rec[0] if rec else None
            return rec
    except Exception as e:
        logger.warning(
            "Failed to load recurring config for subscription %s: %s",
            getattr(subscription, "id", "unknown"),
            e,
        )
    return None


# ─── Edit invoice ─────────────────────────────────────────────────────────────


def edit_subscription_invoice(
    db: Session,
    subscription: Subscription,
    sequence: int,
    payload: EditSubscriptionInvoiceRequest,
    actor_id: int,
) -> EditSubscriptionInvoiceResponse:
    """
    Update a generated invoice (single or all remaining) on a subscription.

    For scope="single": update the specific invoice with the given sequence_id.
      - If a real DB invoice exists: update it (unless paid or cancelled).
      - If the item is virtual (no DB invoice): create a new PENDING invoice with
        the edited amount/due_date so the scheduler picks it up correctly.
    For scope="all_remaining": update all non-paid DB invoices >= the given sequence.
    """
    from src.apps.invoices.models.invoice import Invoice
    from sqlalchemy import select, update
    from src.core.utils.enums import InvoiceStatusTypes

    paid_statuses = {InvoiceStatusTypes.PAID, InvoiceStatusTypes.PARTIALLY_PAID, InvoiceStatusTypes.CAPTURED}
    blocked_statuses = paid_statuses | {InvoiceStatusTypes.CANCELLED}

    if payload.scope == "single":
        stmt = select(Invoice).where(
            Invoice.subscription_id == subscription.id,
            Invoice.sequence_id == sequence,
            Invoice.deleted_at == None,
        )
        invoices = list(db.execute(stmt).scalars().all())

        # Guard: if found but in a non-editable state, raise immediately
        if invoices:
            inv = invoices[0]
            if inv.status in blocked_statuses:
                raise APIException(
                    message=f"Invoice at sequence {sequence} cannot be edited (status: {inv.status})",
                    status_code=409,
                )
    else:
        stmt = select(Invoice).where(
            Invoice.subscription_id == subscription.id,
            Invoice.sequence_id >= sequence,
            Invoice.deleted_at == None,
            Invoice.status.not_in([s.value for s in blocked_statuses]),
        )
        invoices = list(db.execute(stmt).scalars().all())

    updated_count = 0

    if not invoices and payload.scope == "single":
        # Virtual item — no DB invoice exists yet. Create a PENDING invoice so
        # the scheduler picks up the overridden amount/due_date.
        pr = subscription.payment_request
        if not pr:
            raise APIException(
                message="Subscription has no associated payment request",
                status_code=422,
            )
        # Resolve due date and amount
        new_amount = payload.amount if payload.amount is not None else pr.amount
        new_due_date = payload.due_date

        # Try to compute the projected date for this sequence if not provided
        if new_due_date is None:
            rec = _get_recurring_config(subscription)
            if rec and subscription.next_billing_date:
                try:
                    from src.apps.subscriptions.helpers.billing_date import compute_projected_dates
                    from sqlalchemy import select as _select

                    existing_seqs_stmt = _select(Invoice.sequence_id).where(
                        Invoice.subscription_id == subscription.id,
                        Invoice.deleted_at == None,
                    )
                    existing_seqs = {r[0] for r in db.execute(existing_seqs_stmt).all() if r[0] is not None}
                    max_existing = max(existing_seqs, default=0)
                    effective_gen = max(subscription.invoices_generated or 0, max_existing)
                    future_dates = compute_projected_dates(
                        interval=getattr(rec, "interval", "month"),
                        interval_value=getattr(rec, "interval_value", 1),
                        from_date=subscription.next_billing_date,
                        count=sequence - effective_gen + 1,
                        end_type=getattr(rec, "end_type", None),
                        end_date=getattr(rec, "end_date", None),
                        pay_until_count=getattr(rec, "pay_until_count", None),
                        already_generated=effective_gen,
                    )
                    idx = sequence - effective_gen - 1
                    if 0 <= idx < len(future_dates):
                        new_due_date = future_dates[idx]
                except Exception:
                    pass

        # Resolve customer / payer IDs
        customer_id_val = subscription.customer_id or (pr.customer_id if hasattr(pr, "customer_id") else None)
        payer_id = None
        try:
            from src.apps.payment_requests.models.payment_request_customer import PaymentRequestCustomer
            prc = db.execute(
                select(PaymentRequestCustomer).where(
                    PaymentRequestCustomer.payment_request_id == pr.id
                ).limit(1)
            ).scalar_one_or_none()
            if prc:
                payer_id = prc.payer_id
                if customer_id_val is None:
                    customer_id_val = prc.customer_id
        except Exception:
            pass

        from src.apps.invoices.services.invoice_services import create_invoice
        created_invoice = create_invoice(
            db=db,
            payment_request=pr,
            merchant_id=subscription.merchant_id,
            customer_id=customer_id_val or 0,
            amount=new_amount,
            status=InvoiceStatusTypes.PENDING,
            payer_id=payer_id,
            due_date=new_due_date,
            billing_date=new_due_date,
            sequence_id=sequence,
        )
        created_invoice.subscription_id = subscription.id
        db.flush()
        # Write invoice activity for the newly created invoice
        from src.apps.invoices import crud as invoice_crud
        from src.core.utils.enums import InvoiceActivityTypes
        invoice_crud.write_activity(
            db=db,
            invoice_id=created_invoice.id,
            activity_type=InvoiceActivityTypes.INVOICE_CREATED,
            description=f"Invoice created for subscription schedule item #{sequence}",
            actor_type="merchant",
            actor_id=actor_id,
        )
        # Add line items for the newly created virtual invoice
        if payload.line_items is not None:
            from src.apps.invoices.models.invoice_line_items import InvoiceLineItems
            line_items_total = 0.0
            for li in payload.line_items:
                unit_price = float(li.unit_price or 0)
                qty = int(li.quantity or 1)
                line_items_total += unit_price * qty
                db.add(InvoiceLineItems(
                    invoice_id=created_invoice.id,
                    title=li.title or "",
                    description=li.description or "",
                    unit_price=unit_price,
                    quantity=qty,
                    tax=float(li.tax or 0),
                    cost=float(li.cost or 0),
                    upcharge=float(li.upcharge or 0),
                ))
            # Auto-compute amount in cents from line items if not explicitly set
            if payload.amount is None and line_items_total > 0:
                created_invoice.amount = line_items_total * 100
            db.flush()
        updated_count = 1
    elif not invoices:
        raise NotFoundError(message=f"No editable invoice found at sequence {sequence}")
    else:
        for inv in invoices:
            if inv.status in [s.value for s in paid_statuses]:
                continue
            if payload.amount is not None:
                inv.amount = payload.amount
            if payload.due_date is not None:
                inv.due_date = payload.due_date
            # Replace line items when provided
            if payload.line_items is not None:
                from sqlalchemy import delete as _del
                from src.apps.invoices.models.invoice_line_items import InvoiceLineItems
                db.execute(_del(InvoiceLineItems).where(InvoiceLineItems.invoice_id == inv.id))
                line_items_total = 0.0
                for li in payload.line_items:
                    unit_price = float(li.unit_price or 0)
                    qty = int(li.quantity or 1)
                    line_items_total += unit_price * qty
                    db.add(InvoiceLineItems(
                        invoice_id=inv.id,
                        title=li.title or "",
                        description=li.description or "",
                        unit_price=unit_price,
                        quantity=qty,
                        tax=float(li.tax or 0),
                        cost=float(li.cost or 0),
                        upcharge=float(li.upcharge or 0),
                    ))
                # Auto-compute amount in cents from line items if not explicitly set
                if payload.amount is None and line_items_total > 0:
                    inv.amount = line_items_total * 100
            db.flush()
            updated_count += 1

    # Count virtual cycles affected (all_remaining scope)
    virtual_cycles_affected = 0
    if payload.scope == "all_remaining":
        generated_seqs = {inv.sequence_id for inv in invoices}
        if subscription.invoices_generated:
            for s in range(sequence, subscription.invoices_generated + 10):
                if s not in generated_seqs:
                    virtual_cycles_affected += 1
                    if virtual_cycles_affected >= 10:
                        break

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.STATUS_CHANGED,
        description=f"Invoice(s) edited: scope={payload.scope}, sequence={sequence}",
        actor_type="merchant",
        actor_id=actor_id,
        metadata={"updated_count": updated_count, "scope": payload.scope},
    )

    return EditSubscriptionInvoiceResponse(
        updated_invoices=updated_count,
        virtual_cycles_affected=virtual_cycles_affected,
    )


# ─── Lifecycle actions ────────────────────────────────────────────────────────


def _revoke_pending_payment_tasks(db: Session, subscription_id: int) -> None:
    """
    Revoke any pending Celery process_scheduled_payment tasks for all future
    invoices belonging to this subscription.

    Task IDs are stored in Transactions.txn_metadata["celery_task_id"] when
    the generate_invoices Celery task schedules a charge.  Revoking prevents
    a charge from firing after a subscription is cancelled or paused.
    """
    from sqlalchemy import select as _sel
    from src.apps.invoices.models.invoice import Invoice
    from src.apps.transactions.models.transactions import Transactions, transactions_invoices_map
    from src.core.utils.enums import TransactionStatusTypes
    from src.worker.celery_app import celery_app

    stmt = (
        _sel(Transactions)
        .join(transactions_invoices_map, transactions_invoices_map.c.transaction_id == Transactions.id)
        .join(Invoice, Invoice.id == transactions_invoices_map.c.invoice_id)
        .where(
            Invoice.subscription_id == subscription_id,
            Transactions.txn_status == TransactionStatusTypes.PENDING,
            Invoice.deleted_at == None,
        )
    )
    pending_txns = db.execute(stmt).scalars().all()

    for txn in pending_txns:
        task_id = (txn.txn_metadata or {}).get("celery_task_id")
        if task_id:
            try:
                celery_app.control.revoke(task_id)
                logger.debug("Revoked Celery task %s for subscription id=%d", task_id, subscription_id)
            except Exception as revoke_err:
                logger.warning(
                    "Could not revoke Celery task %s for subscription id=%d: %s",
                    task_id, subscription_id, revoke_err,
                )


def _cancel_pending_subscription_invoices(
    db: Session, subscription_id: int, actor_id: int
) -> None:
    """Cancel all pending/unpaid invoices linked to this subscription (HPMNTP-946).

    When a subscription is cancelled or expired, future scheduled invoice records
    should be marked CANCELLED so they no longer appear as pending obligations.
    """
    from sqlalchemy import select as _sel
    from src.apps.invoices.models.invoice import Invoice
    from src.core.utils.enums import InvoiceStatusTypes, InvoiceActivityTypes
    from src.apps.invoices import crud as invoice_crud

    cancellable_statuses = [
        InvoiceStatusTypes.PENDING,
        InvoiceStatusTypes.WAITING,
        InvoiceStatusTypes.CREATED,
        InvoiceStatusTypes.UPDATED,
        InvoiceStatusTypes.AWAITING_APPROVAL,
        InvoiceStatusTypes.OVERDUE,   # past-due invoices also cancelled
        InvoiceStatusTypes.FAILED,    # failed charge invoices also cancelled
    ]

    stmt = _sel(Invoice).where(
        Invoice.subscription_id == subscription_id,
        Invoice.status.in_(cancellable_statuses),
        Invoice.deleted_at == None,
    )
    invoices = db.execute(stmt).scalars().all()

    now = datetime.now(timezone.utc)
    for inv in invoices:
        inv.status = InvoiceStatusTypes.CANCELLED
        inv.updated_at = now
        try:
            invoice_crud.write_activity(
                db=db,
                invoice_id=inv.id,
                activity_type=InvoiceActivityTypes.INVOICE_CANCELLED,
                description="Invoice cancelled because subscription was cancelled",
                actor_type="merchant" if actor_id else "system",
                actor_id=actor_id,
            )
        except Exception:
            pass  # Activity logging is non-critical


def cancel_subscription(
    db: Session,
    subscription: Subscription,
    actor_id: int,
    hard_delete: bool = False,
) -> Subscription:
    """Cancel a subscription atomically. Sets status = CANCELLED.

    Uses an atomic UPDATE with a status pre-condition (VULN-007) to prevent
    race conditions where two concurrent requests could both succeed.
    """
    now = datetime.now(timezone.utc)

    non_cancellable = [
        SubscriptionStatus.CANCELLED,
        SubscriptionStatus.COMPLETED,
        SubscriptionStatus.DUNNING_EXHAUSTED,
    ]

    update_values = {"status": SubscriptionStatus.CANCELLED, "updated_at": now}
    if hard_delete:
        update_values["deleted_at"] = now

    result = db.execute(
        update(Subscription)
        .where(
            Subscription.id == subscription.id,
            Subscription.status.not_in(non_cancellable),
        )
        .values(**update_values)
        .returning(Subscription.id)
    )
    if result.first() is None:
        raise APIException(
            message="Subscription cannot be cancelled in its current status",
            status_code=409,
        )

    db.refresh(subscription)

    _revoke_pending_payment_tasks(db, subscription.id)

    # Cancel all pending/unpaid invoices linked to this subscription (HPMNTP-946)
    _cancel_pending_subscription_invoices(db, subscription.id, actor_id)

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.CANCELLED,
        description="Subscription cancelled",
        actor_type="merchant" if actor_id else "system",
        actor_id=actor_id,
    )
    return subscription


def pause_subscription(
    db: Session,
    subscription: Subscription,
    actor_id: int,
) -> Subscription:
    """Pause an active or past-due subscription atomically (VULN-007)."""
    now = datetime.now(timezone.utc)

    pausable_statuses = [SubscriptionStatus.ACTIVE, SubscriptionStatus.PAST_DUE]

    result = db.execute(
        update(Subscription)
        .where(
            Subscription.id == subscription.id,
            Subscription.status.in_(pausable_statuses),
        )
        .values(status=SubscriptionStatus.PAUSED, updated_at=now)
        .returning(Subscription.id)
    )
    if result.first() is None:
        raise APIException(
            message="Only active or past-due subscriptions can be paused",
            status_code=409,
        )

    db.refresh(subscription)

    _revoke_pending_payment_tasks(db, subscription.id)

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.PAUSED,
        description="Subscription paused by merchant",
        actor_type="merchant",
        actor_id=actor_id,
    )
    return subscription


def resume_subscription(
    db: Session,
    subscription: Subscription,
    actor_id: int,
) -> Subscription:
    """Resume a paused subscription. Only PAUSED subscriptions can be resumed."""
    if subscription.status != SubscriptionStatus.PAUSED:
        raise APIException(
            message=f"Cannot resume subscription in status {status_to_text(subscription.status)}",
            status_code=409,
        )

    subscription.status = SubscriptionStatus.ACTIVE
    subscription.updated_at = datetime.now(timezone.utc)
    db.flush()

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.RESUMED,
        description="Subscription resumed by merchant",
        actor_type="merchant",
        actor_id=actor_id,
    )
    return subscription


# ─── Charge / retry ──────────────────────────────────────────────────────────


async def charge_now(
    db: Session,
    subscription: Subscription,
    invoice,
    merchant_id: int,
    actor_id: Optional[int] = None,
) -> dict:
    """
    Immediately attempt to charge the given invoice.

    Delegates to the payment provider via the payment_request's active provider.
    Returns a dict consumed by ChargeNowResponse.
    """
    from src.core.utils.enums import InvoiceStatusTypes
    from src.core.exceptions import APIException

    # VULN-001: Guard — only chargeable statuses may proceed
    CHARGEABLE_STATUSES = {
        InvoiceStatusTypes.PENDING,
        InvoiceStatusTypes.OVERDUE,
        InvoiceStatusTypes.FAILED,
    }
    if invoice.status not in CHARGEABLE_STATUSES:
        raise APIException(
            message="Invoice cannot be charged in its current status",
            status_code=409,
        )

    logger.info(
        "charge_now: subscription=%s invoice=%s",
        subscription.subscription_literal,
        invoice.invoice_literal,
    )

    # Find the pending transaction for this invoice so we can submit the charge
    from sqlalchemy import select
    from src.apps.transactions.models.transactions import Transactions, transactions_invoices_map
    from src.core.utils.enums import TransactionStatusTypes

    txn_stmt = (
        select(Transactions)
        .join(transactions_invoices_map, transactions_invoices_map.c.transaction_id == Transactions.id)
        .where(
            transactions_invoices_map.c.invoice_id == invoice.id,
            Transactions.txn_status == TransactionStatusTypes.PENDING,
        )
        .order_by(Transactions.id.desc())
        .limit(1)
    )
    pending_txn = db.execute(txn_stmt).scalar_one_or_none()

    if pending_txn is None:
        raise APIException(
            message="No pending transaction found for this invoice — cannot charge",
            status_code=422,
        )

    if pending_txn.payment_method_id is None:
        raise APIException(
            message="No payment method associated with this invoice — cannot charge",
            status_code=422,
        )

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.CHARGE_NOW_INITIATED,
        description=f"Charge-now initiated for invoice {invoice.invoice_literal}",
        actor_type="merchant",
        actor_id=actor_id,
        metadata={"invoice_id": invoice.invoice_id, "amount": invoice.amount},
    )

    # Queue the charge immediately (no eta = execute as soon as a worker is free).
    # process_scheduled_payment updates the transaction status and emits
    # transaction.completed / transaction.failed; the subscription listener
    # then sets the invoice to PAID or FAILED accordingly.
    from src.worker.hpp_tasks import process_scheduled_payment
    process_scheduled_payment.apply_async(
        args=[pending_txn.id, pending_txn.payment_method_id],
    )

    return {
        "status": "charge_submitted",
        "invoice_id": invoice.invoice_id,
        "invoice_literal": invoice.invoice_literal,
    }


async def manual_retry(
    db: Session,
    subscription: Subscription,
    invoice,
    actor_id: int,
) -> dict:
    """
    Re-attempt a failed invoice payment.

    If the subscription's payment_request uses pre_auth, submit directly.
    If it uses request_auth, generate an HPP re-auth link.
    """
    from src.core.utils.enums import InvoiceStatusTypes
    from src.core.exceptions import APIException
    from src.apps.invoices.models.invoice import Invoice
    from sqlalchemy import select

    # VULN-002: Guard — only retryable statuses may proceed
    RETRYABLE_STATUSES = {InvoiceStatusTypes.FAILED, InvoiceStatusTypes.OVERDUE}
    if invoice.status not in RETRYABLE_STATUSES:
        raise APIException(
            message="Invoice must be in FAILED or OVERDUE status to retry",
            status_code=409,
        )

    # VULN-002: Use SELECT FOR UPDATE to prevent concurrent double-charges
    locked_invoice = db.execute(
        select(Invoice)
        .where(Invoice.id == invoice.id, Invoice.deleted_at == None)
        .with_for_update()
    ).scalar_one_or_none()

    if locked_invoice is None:
        raise APIException(message="Invoice not found", status_code=404)

    # Re-check status after acquiring lock
    if locked_invoice.status not in RETRYABLE_STATUSES:
        raise APIException(
            message="Invoice must be in FAILED or OVERDUE status to retry",
            status_code=409,
        )

    # VULN-015: Raise error if payment_request is None
    pr = subscription.payment_request
    if pr is None:
        raise APIException(
            message="Subscription has no associated payment request",
            status_code=422,
        )
    auth_type = pr.authorization_type

    if auth_type == "request_auth":
        return await generate_reauth_link(db, subscription, locked_invoice)

    # pre_auth: mark for immediate retry
    locked_invoice.status = InvoiceStatusTypes.CAPTURED
    locked_invoice.updated_at = datetime.now(timezone.utc)
    db.flush()

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.DUNNING_RETRY_ATTEMPTED,
        description=f"Manual retry submitted for invoice {locked_invoice.invoice_literal}",
        actor_type="merchant",
        actor_id=actor_id,
        metadata={"invoice_id": locked_invoice.invoice_id},
    )

    return {
        "auth_type": "pre_auth",
        "status": "retry_submitted",
        "hpp_link": None,
        "expires_at": None,
    }


async def generate_reauth_link(
    db: Session,
    subscription: Subscription,
    invoice,
) -> dict:
    """
    Generate an HPP re-authorisation link for the customer to update
    their payment method.

    Stores a PaymentRequestLinks record with is_reauth=True and expires_at
    so that expiry is enforced server-side (VULN-004).
    """
    from datetime import timedelta
    from src.apps.payment_requests.models.payment_request_links import PaymentRequestLinks
    from src.apps.base.utils.functions import generate_secure_id

    expires_at = datetime.now(timezone.utc) + timedelta(days=7)

    # Generate a cryptographically secure token for the re-auth link
    reauth_token = generate_secure_id(prepend="rat", length=32)

    # Build the link path
    link_path = f"/hpp/reauth/{subscription.subscription_id}"

    # Store a PaymentRequestLinks record with is_reauth=True so expiry is
    # enforced server-side when the HPP validates the token.
    pr = subscription.payment_request
    if pr is not None:
        payment_link = PaymentRequestLinks(
            link=link_path,
            token=reauth_token,
            is_expired=False,
            status="PENDING",
            start_date=datetime.now(timezone.utc),
            end_date=expires_at,
            is_reauth=True,
            payment_request_id=pr.id,
            invoice_id=invoice.id,
        )
        db.add(payment_link)
        db.flush()

    hpp_link = f"{link_path}?token={reauth_token}"

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.REAUTH_REQUESTED,
        description=f"Re-auth link generated for invoice {invoice.invoice_literal}",
        actor_type="system",
        actor_id=None,
        metadata={"invoice_id": invoice.invoice_id, "expires_at": expires_at.isoformat()},
    )

    return {
        "auth_type": "request_auth",
        "status": "hpp_link_generated",
        "hpp_link": hpp_link,
        "expires_at": expires_at,
    }


# ─── Summary ─────────────────────────────────────────────────────────────────


def get_subscription_summary(
    db: Session,
    merchant_id: int,
    customer_id: Optional[int] = None,
    date_from: Optional[str] = None,
    date_to: Optional[str] = None,
) -> SubscriptionSummaryResponse:
    """Return aggregate subscription stats for a merchant (or customer), including MRR."""
    from src.apps.subscriptions.models.subscription import Subscription as SubModel
    from sqlalchemy import select

    raw = sub_crud.get_subscription_summary(db, merchant_id, customer_id=customer_id, date_from=date_from, date_to=date_to)

    # Compute MRR: sum normalised MRR for ACTIVE and INITIALIZING subscriptions.
    # HPMNTP-948: INITIALIZING subscriptions (request_auth flow) represent committed
    # recurring revenue and must be included. They only stay INITIALIZING until the
    # nightly generate_invoices task runs — excluding them causes newly created
    # subscriptions to be invisible in MRR for up to 24 hours.
    from sqlalchemy import or_
    where_clauses = [
        SubModel.merchant_id == merchant_id,
        or_(
            SubModel.status == SubscriptionStatus.ACTIVE,
            SubModel.status == SubscriptionStatus.INITIALIZING,
        ),
        SubModel.deleted_at == None,
    ]
    if customer_id is not None:
        where_clauses.append(SubModel.customer_id == customer_id)
    if date_from:
        from datetime import datetime, timezone
        where_clauses.append(SubModel.created_at >= datetime.strptime(date_from, "%Y-%m-%d").replace(tzinfo=timezone.utc))
    if date_to:
        from datetime import datetime, timezone
        where_clauses.append(SubModel.created_at <= datetime.strptime(date_to, "%Y-%m-%d").replace(hour=23, minute=59, second=59, tzinfo=timezone.utc))

    # HPMNTP-965: eagerly load payment_request AND its recurring_config so
    # _get_recurring_config() never falls back to lazy loading (which can
    # silently fail and return None, causing all active subscriptions to
    # contribute $0 to MRR).
    from sqlalchemy.orm import selectinload as _sil
    from src.apps.payment_requests.models.payment_request import PaymentRequest as _PR
    stmt = (
        select(SubModel)
        .where(*where_clauses)
        .options(
            _sil(SubModel.payment_request).selectinload(_PR.recurring_config)
        )
    )
    billable_subs = db.execute(stmt).scalars().all()

    mrr = 0.0
    for sub in billable_subs:
        rec = _get_recurring_config(sub)
        if rec:
            pr = sub.payment_request
            amount = pr.amount if pr else 0.0
            interval = getattr(rec, "interval", "month")
            # interval_value is only a billing-cycle multiplier for
            # custom_interval repeat type (e.g. "every 3 months").
            # For day_of_month / time_of_month / interval / on_date, the
            # subscription bills once per interval period and interval_value
            # stores context like the day-of-month (e.g. 15), NOT a cycle
            # count — dividing by it would massively understate MRR.
            repeat_type = getattr(rec, "repeat_type", None)
            if repeat_type == "custom_interval":
                interval_value = getattr(rec, "interval_value", 1) or 1
            else:
                interval_value = 1
            mrr += normalize_to_mrr(amount, interval, interval_value)

    active_count = raw["active_count"]
    avg_val = raw["avg_subscription_value"]

    return SubscriptionSummaryResponse(
        active_count=active_count,
        initializing_count=raw["initializing_count"],
        past_due_count=raw["past_due_count"],
        failed_count=raw["failed_count"],
        cancelled_count=raw["cancelled_count"],
        completed_count=raw["completed_count"],
        dunning_exhausted_count=raw["dunning_exhausted_count"],
        paused_count=raw["paused_count"],
        total_billed=raw["total_billed"],
        total_collected=raw["total_collected"],
        avg_subscription_value=avg_val,
        mrr=round(mrr, 2),
    )


# ─── Response builders ────────────────────────────────────────────────────────


def build_list_item(subscription: Subscription) -> dict:
    """Build SubscriptionListItemResponse dict from a Subscription ORM instance."""
    rec = _get_recurring_config(subscription)
    interval = getattr(rec, "interval", "month") if rec else "month"
    interval_value = getattr(rec, "interval_value", 1) if rec else 1

    customer = subscription.customer
    customer_name = None
    if customer:
        customer_name = (
            getattr(customer, "business_legal_name", None)
            or f"{getattr(customer, 'first_name', '') or ''} {getattr(customer, 'last_name', '') or ''}".strip()
            or None
        )

    return {
        "id": subscription.id,
        "subscription_id": subscription.subscription_id,
        "subscription_literal": subscription.subscription_literal,
        "name": subscription.name,
        "status": subscription.status,
        "status_text": status_to_text(subscription.status),
        "customer_id": subscription.customer_id,
        "customer_name": customer_name,
        "amount": subscription.payment_request.amount if subscription.payment_request else 0.0,
        "interval_label": get_interval_label(interval, interval_value),
        "next_billing_date": subscription.next_billing_date,
        "total_paid": subscription.total_paid,
        "invoices_paid": subscription.invoices_paid,
        "invoices_generated": subscription.invoices_generated,
        "created_at": subscription.created_at,
    }


def _get_pr_auth_literal(pr, db: Session) -> Optional[str]:
    """Return the authorization_literal from the PaymentRequestAuthorizations record for this PR."""
    if not pr:
        return None
    from src.apps.payment_requests.models.payment_request_authorizations import PaymentRequestAuthorizations
    from sqlalchemy import select
    auth = db.execute(
        select(PaymentRequestAuthorizations.authorization_literal)
        .where(
            PaymentRequestAuthorizations.payment_request_id == pr.id,
            PaymentRequestAuthorizations.deleted_at == None,
        )
        .limit(1)
    ).scalar_one_or_none()
    return auth


def build_detail(subscription: Subscription, db: Session) -> dict:
    """Build SubscriptionDetailResponse dict, including recent activities."""
    from src.core.utils.enums import InvoiceStatusTypes
    from src.apps.invoices.models.invoice import Invoice
    from src.apps.payment_requests.models.payment_request_customer import PaymentRequestCustomer
    from src.apps.payment_methods.models.payment_methods import PaymentMethod
    from src.apps.customers.models.customer_contact import CustomerContact
    from sqlalchemy import select, func
    from sqlalchemy.orm import joinedload, selectinload

    base = build_list_item(subscription)
    rec = _get_recurring_config(subscription)

    # Count invoice statuses
    inv_base = [
        Invoice.subscription_id == subscription.id,
        Invoice.deleted_at == None,
    ]

    def _count_inv(status_val):
        stmt = select(func.count(Invoice.id)).where(*inv_base, Invoice.status == status_val)
        return db.execute(stmt).scalar_one()

    recent_activities, _ = sub_crud.get_activities(db, subscription.id, page=1, per_page=10)

    # ── Payer info (from PaymentRequestCustomer → CustomerContact) ─────────────
    payer_info = None
    pr = subscription.payment_request
    if pr:
        prc_stmt = (
            select(PaymentRequestCustomer)
            .where(PaymentRequestCustomer.payment_request_id == pr.id)
            .options(
                joinedload(PaymentRequestCustomer.payer),
                joinedload(PaymentRequestCustomer.customer),
            )
            .limit(1)
        )
        prc = db.execute(prc_stmt).unique().scalar_one_or_none()
        if prc:
            payer = prc.payer
            if payer:
                payer_info = {
                    "payer_name": f"{payer.first_name or ''} {payer.last_name or ''}".strip() or None,
                    "payer_email": payer.email,
                    "payer_phone": payer.phone or payer.account_phone or payer.office_phone,
                    "payer_title": payer.title,
                    "payer_designation": payer.designation,
                }

    # ── Payment method (card details) ─────────────────────────────────────────
    payment_method_info = None
    if pr:
        pm_stmt = (
            select(PaymentMethod)
            .where(
                PaymentMethod.payment_request_id == pr.id,
                PaymentMethod.deleted_at == None,
            )
            .limit(1)
        )
        pm = db.execute(pm_stmt).scalar_one_or_none()
        if pm:
            cd = pm.card_details
            if cd:
                last4 = cd.card_number[-4:] if cd.card_number and len(cd.card_number) >= 4 else cd.card_number
                payment_method_info = {
                    "brand": cd.brand,
                    "last4": last4,
                    "expire_month": cd.expire_month,
                    "expire_year": cd.expire_year,
                    "is_default": cd.is_default,
                    "tender_summary": f"{cd.brand or ''} ****{last4}".strip() if last4 else cd.brand,
                }
            else:
                # ACH fallback
                ach = pm.ach_details
                if ach:
                    payment_method_info = {
                        "brand": "ACH",
                        "last4": getattr(ach, "account_number", None),
                        "expire_month": None,
                        "expire_year": None,
                        "is_default": False,
                        "tender_summary": f"ACH ****{getattr(ach, 'account_number', '')[-4:] if getattr(ach, 'account_number', None) else ''}".strip(),
                    }

    # ── Customer contacts (for tender update modal) ────────────────────────────
    customer_contacts = []
    if subscription.customer_id:
        contacts_stmt = (
            select(CustomerContact)
            .where(
                CustomerContact.customer_id == subscription.customer_id,
                CustomerContact.deleted_at == None,
            )
        )
        contacts = db.execute(contacts_stmt).scalars().all()
        for c in contacts:
            customer_contacts.append({
                "id": c.id,
                "contact_id": c.contact_id,
                "name": f"{c.first_name or ''} {c.last_name or ''}".strip() or None,
                "email": c.email,
                "phone": c.phone or c.account_phone or c.office_phone,
                "title": c.title,
                "designation": c.designation,
            })

    # ── Terms authorized ──────────────────────────────────────────────────────
    terms_authorized = pr.terms if pr else None

    base.update({
        "total_billed": subscription.total_billed,
        "invoices_overdue": _count_inv(InvoiceStatusTypes.OVERDUE),
        "invoices_pending": _count_inv(InvoiceStatusTypes.PENDING),
        "invoices_cancelled": _count_inv(InvoiceStatusTypes.CANCELLED),
        "past_due_since": subscription.past_due_since,
        "authorization_type": pr.authorization_type if pr else "",
        "authorization_literal": _get_pr_auth_literal(pr, db),
        "payer_info": payer_info,
        "payment_method_info": payment_method_info,
        "customer_contacts": customer_contacts,
        "terms_authorized": terms_authorized,
        "interval": getattr(rec, "interval", "month") if rec else "month",
        "interval_value": getattr(rec, "interval_value", 1) if rec else 1,
        "end_type": getattr(rec, "end_type", "") if rec else "",
        "end_date": getattr(rec, "end_date", None) if rec else None,
        "pay_until_count": getattr(rec, "pay_until_count", None) if rec else None,
        "start_date": getattr(rec, "start_date", None) if rec else None,
        "dunning_retry_count": subscription.dunning_retry_count,
        "dunning_next_retry_at": subscription.dunning_next_retry_at,
        "dunning_exhausted_at": subscription.dunning_exhausted_at,
        "recent_activities": [
            {
                "id": a.id,
                "activity_type": a.activity_type,
                "description": a.description,
                "actor_type": a.actor_type,
                "actor_id": a.actor_id,
                "created_at": a.created_at,
                "metadata_": a.metadata_,
            }
            for a in recent_activities
        ],
    })
    return base


# ─── Schedule ─────────────────────────────────────────────────────────────────


def get_subscription_schedule(
    db: Session,
    subscription: Subscription,
) -> "SubscriptionScheduleResponse":
    """
    Return the payment schedule for a subscription.

    Combines real (DB) invoices with virtual projected future cycles.
    Status is computed dynamically from invoice status and due date.
    All items are always returned — no filtering by paid/cancelled status.
    """
    from src.apps.subscriptions.schemas.subscription_schemas import (
        ScheduleItem,
        SubscriptionScheduleResponse,
    )
    from src.apps.invoices.models.invoice import Invoice
    from src.core.utils.enums import InvoiceStatusTypes
    from sqlalchemy import select
    from sqlalchemy.orm import selectinload

    stmt = (
        select(Invoice)
        .where(
            Invoice.subscription_id == subscription.id,
            Invoice.deleted_at == None,
        )
        .options(selectinload(Invoice.transactions))
        .order_by(Invoice.sequence_id.asc())
    )
    real_invoices = list(db.execute(stmt).scalars().all())

    now = datetime.now(timezone.utc)

    def _compute_status(inv) -> str:
        paid_statuses = {InvoiceStatusTypes.PAID, InvoiceStatusTypes.PARTIALLY_PAID}
        if inv.status in paid_statuses:
            return "paid"
        if inv.status == InvoiceStatusTypes.CANCELLED:
            return "cancelled"
        # CAPTURED (203): charge is in-flight, waiting for provider result
        if inv.status == InvoiceStatusTypes.CAPTURED:
            return "processing"
        # FAILED (400): charge was attempted and failed
        if inv.status == InvoiceStatusTypes.FAILED:
            return "failed"
        # HPMNTP-967: when subscription is paused, all non-settled items are paused
        if subscription.status == SubscriptionStatus.PAUSED:
            return "paused"
        if inv.due_date:
            due = inv.due_date
            if due.tzinfo is None:
                due = due.replace(tzinfo=timezone.utc)
            if due < now:
                return "overdue"
        return "scheduled"

    items: list = []
    for inv in real_invoices:
        status = _compute_status(inv)
        txn = inv.transactions[0] if inv.transactions else None
        can_act = status in ("scheduled", "overdue", "paused")
        # Failed invoices can be retried via Pay Now but not edited/cancelled
        can_pay_now = (
            (can_act or status == "failed")
            and subscription.status in (SubscriptionStatus.ACTIVE, SubscriptionStatus.PAST_DUE)
        )
        items.append(
            ScheduleItem(
                sequence=inv.sequence_id or 0,
                sequence_label=f"#{inv.sequence_id or 0}",
                transaction_id=txn.txn_literal if txn and txn.txn_literal else None,
                schedule_date=inv.due_date,
                auth_amount=inv.amount,
                paid_date=inv.paid_date,
                paid_amount=inv.paid_amount or 0.0,
                status=status,
                can_edit=can_act,
                can_cancel=can_act,
                can_pay_now=can_pay_now,
                invoice_id=inv.invoice_id,
                invoice_literal=inv.invoice_literal,
            )
        )

    # Track sequences already covered by real invoices to avoid duplicates
    real_sequences = {inv.sequence_id for inv in real_invoices if inv.sequence_id is not None}

    # Add virtual projected future cycles
    terminal_statuses = {
        SubscriptionStatus.CANCELLED,
        SubscriptionStatus.COMPLETED,
        SubscriptionStatus.EXPIRED,
        SubscriptionStatus.DUNNING_EXHAUSTED,
    }
    is_terminated = subscription.status in terminal_statuses
    rec = _get_recurring_config(subscription)
    if rec and subscription.next_billing_date:
        from src.apps.subscriptions.helpers.billing_date import compute_projected_dates

        # Use the highest existing sequence as the base — not invoices_generated —
        # to avoid collisions when the counter is stale (e.g. test data, cancelled placeholders).
        max_real_seq = max(real_sequences, default=0)
        effective_generated = max(subscription.invoices_generated or 0, max_real_seq)

        future_dates = compute_projected_dates(
            interval=getattr(rec, "interval", "month"),
            interval_value=getattr(rec, "interval_value", 1),
            from_date=subscription.next_billing_date,
            count=24,
            end_type=getattr(rec, "end_type", None),
            end_date=getattr(rec, "end_date", None),
            pay_until_count=getattr(rec, "pay_until_count", None),
            already_generated=effective_generated,
        )
        pr = subscription.payment_request
        base_amount = pr.amount if pr else 0.0
        next_seq = effective_generated + 1
        for i, dt in enumerate(future_dates):
            seq = next_seq + i
            # Skip if a real invoice already exists for this sequence
            if seq in real_sequences:
                continue
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            items.append(
                ScheduleItem(
                    sequence=seq,
                    sequence_label=f"#{seq}",
                    transaction_id=None,
                    schedule_date=dt,
                    auth_amount=base_amount,
                    paid_date=None,
                    paid_amount=0.0,
                    status="cancelled" if is_terminated else (
                        "paused" if subscription.status == SubscriptionStatus.PAUSED else "scheduled"
                    ),
                    can_edit=False if is_terminated else True,
                    can_cancel=False if is_terminated else True,
                    # Virtual items have no DB invoice yet — Pay Now is not
                    # possible until the invoice is generated by the scheduler.
                    can_pay_now=False,
                    invoice_id=None,
                    invoice_literal=None,
                )
            )

    return SubscriptionScheduleResponse(items=items, total=len(items))


def cancel_schedule_item(
    db: Session,
    subscription: Subscription,
    sequence: int,
    actor_id: int,
) -> dict:
    """
    Cancel a scheduled invoice by sequence number.

    If the invoice already exists in the DB, mark it CANCELLED.
    If the sequence is a virtual (projected) item not yet generated, create the
    invoice in CANCELLED state immediately so the scheduler will skip it.
    """
    from src.apps.invoices.models.invoice import Invoice
    from src.core.utils.enums import InvoiceStatusTypes
    from sqlalchemy import select

    stmt = select(Invoice).where(
        Invoice.subscription_id == subscription.id,
        Invoice.sequence_id == sequence,
        Invoice.deleted_at == None,
    )
    invoice = db.execute(stmt).scalar_one_or_none()

    if invoice:
        # Real invoice found — validate and cancel
        non_cancellable = {
            InvoiceStatusTypes.PAID,
            InvoiceStatusTypes.PARTIALLY_PAID,
            InvoiceStatusTypes.CANCELLED,
        }
        if invoice.status in non_cancellable:
            raise APIException(
                message="Cannot cancel a paid or already cancelled invoice",
                status_code=409,
            )
        invoice.status = InvoiceStatusTypes.CANCELLED
        invoice.updated_at = datetime.now(timezone.utc)
        db.flush()
        invoice_id = invoice.invoice_id
    else:
        # Virtual (projected) item — create a cancelled placeholder so the
        # scheduler and schedule endpoint both recognise it as cancelled.
        pr = subscription.payment_request
        if not pr:
            raise APIException(
                message="Subscription has no associated payment request",
                status_code=422,
            )

        # Compute the projected schedule date for this sequence
        rec = _get_recurring_config(subscription)
        schedule_date = None
        if rec and subscription.next_billing_date:
            try:
                from src.apps.subscriptions.helpers.billing_date import compute_projected_dates
                future_dates = compute_projected_dates(
                    interval=getattr(rec, "interval", "month"),
                    interval_value=getattr(rec, "interval_value", 1),
                    from_date=subscription.next_billing_date,
                    count=sequence - subscription.invoices_generated + 1,
                    end_type=getattr(rec, "end_type", None),
                    end_date=getattr(rec, "end_date", None),
                    pay_until_count=getattr(rec, "pay_until_count", None),
                    already_generated=subscription.invoices_generated,
                )
                idx = sequence - subscription.invoices_generated - 1
                if 0 <= idx < len(future_dates):
                    schedule_date = future_dates[idx]
            except Exception:
                pass

        from src.apps.invoices.services.invoice_services import create_invoice
        # Get payer info from payment_request_customers
        payer_id = None
        customer_id_val = subscription.customer_id or (pr.customer_id if hasattr(pr, "customer_id") else None)
        try:
            from src.apps.payment_requests.models.payment_request_customer import PaymentRequestCustomer
            prc = db.execute(
                select(PaymentRequestCustomer).where(
                    PaymentRequestCustomer.payment_request_id == pr.id
                ).limit(1)
            ).scalar_one_or_none()
            if prc:
                payer_id = prc.payer_id
                if customer_id_val is None:
                    customer_id_val = prc.customer_id
        except Exception:
            pass

        created_invoice = create_invoice(
            db=db,
            payment_request=pr,
            merchant_id=subscription.merchant_id,
            customer_id=customer_id_val or 0,
            amount=pr.amount,
            status=InvoiceStatusTypes.CANCELLED,
            payer_id=payer_id,
            due_date=schedule_date,
            billing_date=schedule_date,
            sequence_id=sequence,
        )
        # Link to subscription
        created_invoice.subscription_id = subscription.id
        db.flush()
        invoice_id = created_invoice.invoice_id
        # Write invoice activity for the cancelled invoice
        from src.apps.invoices import crud as invoice_crud
        from src.core.utils.enums import InvoiceActivityTypes
        invoice_crud.write_activity(
            db=db,
            invoice_id=created_invoice.id,
            activity_type=InvoiceActivityTypes.INVOICE_CANCELLED,
            description=f"Invoice cancelled for subscription schedule item #{sequence}",
            actor_type="merchant",
            actor_id=actor_id,
        )

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.STATUS_CHANGED,
        description=f"Schedule item #{sequence} cancelled by merchant",
        actor_type="merchant",
        actor_id=actor_id,
        metadata={"sequence": sequence, "invoice_id": invoice_id},
    )

    return {"cancelled": True, "sequence": sequence, "invoice_id": invoice_id}


def get_subscription_associations(
    db: Session,
    subscription: Subscription,
) -> "SubscriptionAssociationsResponse":
    """Return all transactions and invoices linked to this subscription."""
    from src.apps.subscriptions.schemas.subscription_schemas import (
        AssociationInvoice,
        AssociationTransaction,
        SubscriptionAssociationsResponse,
    )
    from src.apps.invoices.models.invoice import Invoice
    from src.apps.transactions.models.transactions import Transactions as _Txn
    from src.core.utils.enums import InvoiceStatusTypes
    from sqlalchemy import select
    from sqlalchemy.orm import selectinload, joinedload

    inv_stmt = (
        select(Invoice)
        .where(
            Invoice.subscription_id == subscription.id,
            Invoice.deleted_at == None,
        )
        .options(
            selectinload(Invoice.transactions).selectinload(_Txn.payment_method),
            joinedload(Invoice.customer),
        )
        .order_by(Invoice.sequence_id.asc())
    )
    invoices = list(db.execute(inv_stmt).unique().scalars().all())

    inv_status_map = {
        InvoiceStatusTypes.PAID: "paid",
        InvoiceStatusTypes.PARTIALLY_PAID: "partially_paid",
        InvoiceStatusTypes.OVERDUE: "overdue",
        InvoiceStatusTypes.CANCELLED: "cancelled",
        InvoiceStatusTypes.PENDING: "pending",
        InvoiceStatusTypes.WAITING: "pending",
        InvoiceStatusTypes.CREATED: "pending",
        InvoiceStatusTypes.UPDATED: "pending",
        InvoiceStatusTypes.FAILED: "failed",
        InvoiceStatusTypes.CAPTURED: "processing",
    }

    def _customer_name(customer) -> Optional[str]:
        if not customer:
            return None
        return (
            getattr(customer, "business_legal_name", None)
            or f"{getattr(customer, 'first_name', '') or ''} {getattr(customer, 'last_name', '') or ''}".strip()
            or None
        )

    assoc_invoices = []
    for inv in invoices:
        assoc_invoices.append(
            AssociationInvoice(
                invoice_id=inv.invoice_id,
                invoice_literal=inv.invoice_literal,
                customer_name=_customer_name(inv.customer),
                reference=inv.reference,
                issue_date=inv.billing_date or inv.created_at,
                due_date=inv.due_date,
                amount_due=inv.amount,
                amount_paid=inv.paid_amount or 0.0,
                status=inv_status_map.get(inv.status, "pending"),
            )
        )

    seen_txn_ids: set = set()
    assoc_transactions = []
    txn_status_map = {
        100: "pending",
        200: "paid",
        300: "failed",
        400: "refunded",
        500: "voided",
    }

    for inv in invoices:
        for txn in inv.transactions or []:
            if txn.id in seen_txn_ids:
                continue
            seen_txn_ids.add(txn.id)

            pm = getattr(txn, "payment_method", None)
            tender_summary = None
            if pm:
                cd = getattr(pm, "card_details", None)
                if cd:
                    brand = getattr(cd, "brand", None) or ""
                    card_num = getattr(cd, "card_number", None) or ""
                    last4 = card_num[-4:] if len(card_num) >= 4 else card_num
                    tender_summary = f"{brand} ****{last4}".strip() if last4 else brand or None
                else:
                    ach = getattr(pm, "ach_details", None)
                    if ach:
                        acct_num = getattr(ach, "account_number", None) or ""
                        last4 = acct_num[-4:] if len(acct_num) >= 4 else acct_num
                        tender_summary = f"ACH ****{last4}".strip() if last4 else "ACH"

            txn_dt = getattr(txn, "occurred_at", None) or getattr(txn, "ocurred_at", None)
            txn_date = txn_dt if txn_dt else None
            txn_time = txn_dt.strftime("%I:%M %p") if txn_dt and hasattr(txn_dt, "strftime") else None

            assoc_transactions.append(
                AssociationTransaction(
                    transaction_id=txn.txn_id,
                    txn_literal=txn.txn_literal,
                    customer_name=_customer_name(getattr(txn, "customer", None)),
                    tender_summary=tender_summary,
                    txn_type=getattr(txn, "txn_type", None),
                    txn_date=txn_date,
                    txn_time=txn_time,
                    amount=txn.txn_amount,
                    status=txn_status_map.get(txn.txn_status, "pending"),
                )
            )

    return SubscriptionAssociationsResponse(
        transactions=assoc_transactions,
        invoices=assoc_invoices,
    )


def get_subscription_transactions(
    db: Session,
    subscription: Subscription,
) -> list:
    """
    Return all transactions linked to this subscription.

    Collects transactions from two sources:
    1. Transactions linked via subscription invoices (invoice → M2M → transaction).
    2. Transactions linked directly to the subscription's payment_request, for
       pre-auth subscriptions that were charged before any invoice was generated.
    """
    from src.apps.subscriptions.schemas.subscription_schemas import AssociationTransaction
    from src.apps.invoices.models.invoice import Invoice
    from sqlalchemy import select
    from sqlalchemy.orm import selectinload

    txn_status_map = {100: "pending", 200: "paid", 300: "failed", 400: "refunded", 500: "voided"}

    def _customer_name(customer) -> Optional[str]:
        if not customer:
            return None
        return (
            getattr(customer, "business_legal_name", None)
            or f"{getattr(customer, 'first_name', '') or ''} {getattr(customer, 'last_name', '') or ''}".strip()
            or None
        )

    seen_ids: set = set()
    result: list = []

    def _append_txn(txn) -> None:
        if txn.id in seen_ids:
            return
        seen_ids.add(txn.id)
        pm = getattr(txn, "payment_method", None)
        tender_summary = None
        if pm:
            cd = getattr(pm, "card_details", None)
            if cd:
                brand = getattr(cd, "brand", None) or ""
                card_num = getattr(cd, "card_number", None) or ""
                last4 = card_num[-4:] if len(card_num) >= 4 else card_num
                tender_summary = f"{brand} ****{last4}".strip() if last4 else brand or None
            else:
                ach = getattr(pm, "ach_details", None)
                if ach:
                    acct_num = getattr(ach, "account_number", None) or ""
                    last4 = acct_num[-4:] if len(acct_num) >= 4 else acct_num
                    tender_summary = f"ACH ****{last4}".strip() if last4 else "ACH"
        txn_dt = getattr(txn, "occurred_at", None) or getattr(txn, "ocurred_at", None)
        result.append(
            AssociationTransaction(
                transaction_id=txn.txn_id,
                txn_literal=txn.txn_literal,
                customer_name=_customer_name(getattr(txn, "customer", None)),
                tender_summary=tender_summary,
                txn_type=getattr(txn, "txn_type", None),
                txn_date=txn_dt,
                txn_time=txn_dt.strftime("%I:%M %p") if txn_dt and hasattr(txn_dt, "strftime") else None,
                amount=txn.txn_amount,
                status=txn_status_map.get(txn.txn_status, "pending"),
            )
        )

    # Source 1: transactions via invoice linkage
    inv_stmt = (
        select(Invoice)
        .where(Invoice.subscription_id == subscription.id, Invoice.deleted_at == None)
        .options(selectinload(Invoice.transactions))
    )
    for inv in db.execute(inv_stmt).unique().scalars().all():
        for txn in inv.transactions or []:
            _append_txn(txn)

    # Source 2: transactions directly on the payment_request (catches pre-auth charges
    # that were made before any subscription invoice was generated)
    if subscription.payment_request_id:
        from src.apps.transactions.models.transactions import Transactions as Transaction
        from sqlalchemy.orm import joinedload as _jl
        pr_txn_stmt = (
            select(Transaction)
            .where(Transaction.payment_request_id == subscription.payment_request_id)
            .options(_jl(Transaction.payment_method), _jl(Transaction.customer))
        )
        for txn in db.execute(pr_txn_stmt).unique().scalars().all():
            _append_txn(txn)

    return result


def get_subscription_invoices_list(
    db: Session,
    subscription: Subscription,
) -> list:
    """Return actual DB invoices linked to this subscription (no projections)."""
    from src.apps.subscriptions.schemas.subscription_schemas import AssociationInvoice
    from src.apps.invoices.models.invoice import Invoice
    from src.core.utils.enums import InvoiceStatusTypes
    from sqlalchemy import select
    from sqlalchemy.orm import joinedload

    inv_status_map = {
        InvoiceStatusTypes.PAID: "paid",
        InvoiceStatusTypes.PARTIALLY_PAID: "partially_paid",
        InvoiceStatusTypes.OVERDUE: "overdue",
        InvoiceStatusTypes.CANCELLED: "cancelled",
        InvoiceStatusTypes.PENDING: "pending",
        InvoiceStatusTypes.WAITING: "pending",
        InvoiceStatusTypes.CREATED: "pending",
        InvoiceStatusTypes.UPDATED: "pending",
        InvoiceStatusTypes.FAILED: "failed",
        InvoiceStatusTypes.CAPTURED: "processing",
    }

    def _customer_name(customer) -> Optional[str]:
        if not customer:
            return None
        return (
            getattr(customer, "business_legal_name", None)
            or f"{getattr(customer, 'first_name', '') or ''} {getattr(customer, 'last_name', '') or ''}".strip()
            or None
        )

    inv_stmt = (
        select(Invoice)
        .where(Invoice.subscription_id == subscription.id, Invoice.deleted_at == None)
        .options(joinedload(Invoice.customer))
        .order_by(Invoice.sequence_id.asc())
    )
    invoices = list(db.execute(inv_stmt).unique().scalars().all())

    return [
        AssociationInvoice(
            invoice_id=inv.invoice_id,
            invoice_literal=inv.invoice_literal,
            customer_name=_customer_name(inv.customer),
            reference=inv.reference,
            issue_date=inv.billing_date or inv.created_at,
            due_date=inv.due_date,
            amount_due=inv.amount,
            amount_paid=inv.paid_amount or 0.0,
            status=inv_status_map.get(inv.status, "pending"),
        )
        for inv in invoices
    ]


def request_tender_update(
    db: Session,
    subscription: Subscription,
    payload: dict,
    actor_id: int,
) -> dict:
    """
    Record a tender-update request for a subscription.

    Logs an activity entry. Real email/SMS delivery would be triggered here
    once the notifications module is implemented.
    """
    contact_ids = payload.get("contact_ids", [])
    send_email = payload.get("send_email", True)
    send_sms = payload.get("send_sms", False)

    sub_crud.write_activity(
        db=db,
        subscription_id=subscription.id,
        activity_type=SubscriptionActivityTypes.STATUS_CHANGED,
        description="Tender update request sent to contacts",
        actor_type="merchant",
        actor_id=actor_id,
        metadata={
            "contact_ids": contact_ids,
            "send_email": send_email,
            "send_sms": send_sms,
        },
    )

    return {
        "subscription_id": subscription.subscription_id,
        "contact_ids": contact_ids,
        "send_email": send_email,
        "send_sms": send_sms,
        "status": "requested",
    }
