"""add_processed_events_and_webhook_events_tables

Creates the processed_events table (idempotency guard for Kafka/internal event
handlers) and the webhook_events table (inbound webhook payload store for
deduplication and replay).

Revision ID: b9c0d1e2f3a4
Revises: 8836302f3563
Create Date: 2026-03-20 10:00:00.000000

"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy import inspect as sa_inspect
from sqlalchemy.dialects.postgresql import JSONB, UUID

# revision identifiers, used by Alembic.
revision: str = 'b9c0d1e2f3a4'
down_revision: Union[str, Sequence[str], None] = '8836302f3563'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    conn = op.get_bind()
    existing_tables = sa_inspect(conn).get_table_names()

    # ── processed_events ───────────────────────────────────────────────────────
    if 'processed_events' not in existing_tables:
        op.create_table(
            'processed_events',
            sa.Column(
                'id',
                UUID(as_uuid=True),
                primary_key=True,
                server_default=sa.text('gen_random_uuid()'),
                nullable=False,
            ),
            sa.Column('event_id', sa.String(255), nullable=False),
            sa.Column('event_type', sa.String(255), nullable=False),
            sa.Column('handler_name', sa.String(255), nullable=False),
            sa.Column(
                'processed_at',
                sa.DateTime(timezone=True),
                server_default=sa.text('now()'),
                nullable=False,
            ),
            sa.UniqueConstraint(
                'event_id',
                'handler_name',
                name='uq_processed_events_event_handler',
            ),
        )
        op.create_index(
            'ix_processed_events_event_id',
            'processed_events',
            ['event_id'],
            unique=False,
        )

    # ── webhook_events ─────────────────────────────────────────────────────────
    if 'webhook_events' not in existing_tables:
        op.create_table(
            'webhook_events',
            sa.Column(
                'id',
                UUID(as_uuid=True),
                primary_key=True,
                server_default=sa.text('gen_random_uuid()'),
                nullable=False,
            ),
            sa.Column('provider_slug', sa.String(50), nullable=False),
            sa.Column(
                'provider_event_id',
                sa.String(255),
                nullable=False,
                unique=True,
            ),
            sa.Column('event_type', sa.String(255), nullable=False),
            sa.Column('raw_payload', JSONB, nullable=False),
            sa.Column('processed', sa.Boolean(), nullable=False, server_default='false'),
            sa.Column(
                'received_at',
                sa.DateTime(timezone=True),
                server_default=sa.text('now()'),
                nullable=False,
            ),
            sa.Column('deleted_at', sa.DateTime(timezone=True), nullable=True),
        )
        op.create_index(
            'ix_webhook_events_provider_event_id',
            'webhook_events',
            ['provider_event_id'],
            unique=True,
        )


def downgrade() -> None:
    op.drop_index('ix_webhook_events_provider_event_id', table_name='webhook_events')
    op.drop_table('webhook_events')

    op.drop_index('ix_processed_events_event_id', table_name='processed_events')
    op.drop_table('processed_events')
