---
name: Certyo + PostgreSQL Integration
version: 1.0.0
description: Generate PostgreSQL → Certyo integration code with LISTEN/NOTIFY, Debezium CDC, polling, field mappings, auth, and working examples
api_base: https://www.certyos.com
auth: X-API-Key header
last_updated: 2026-04-14
---

# Certyo + PostgreSQL Integration Skill

This skill generates production-ready Python and Node.js code to integrate PostgreSQL with Certyo's blockchain-backed authenticity platform. It covers LISTEN/NOTIFY for lightweight real-time CDC, Debezium for production workloads, and timestamp-based polling as a simple fallback. All approaches detect row-level changes and ingest them into Certyo for immutable anchoring on Polygon.

## Certyo API Reference

All requests require the `X-API-Key` header for authentication.

### Endpoints

| Method | Path | Description | Response |
|--------|------|-------------|----------|
| `POST` | `/api/v1/records` | Ingest a single record | `202 Accepted` |
| `POST` | `/api/v1/records/bulk` | Ingest up to 1000 records | `202 Accepted` |
| `POST` | `/api/v1/verify/record` | Verify blockchain anchoring | `200 OK` |
| `GET` | `/api/v1/records` | Query records | `200 OK` |

### Record Payload

```json
{
  "tenantId": "string (required)",
  "database": "string (required)",
  "collection": "string (required)",
  "recordId": "string (required)",
  "recordPayload": { "...any JSON (required)" },
  "clientId": "string (optional)",
  "recordVersion": "string (optional, default '1')",
  "operationType": "upsert|insert|update|delete (optional)",
  "sourceTimestamp": "ISO 8601 (optional)",
  "idempotencyKey": "string (optional)"
}
```

### Ingestion Response (202 Accepted)

```json
{
  "recordId": "42",
  "recordHash": "sha256:ab12cd34...",
  "tenantId": "acme-corp",
  "acceptedAt": "2026-04-14T10:30:00Z",
  "idempotencyReplayed": false
}
```

### Pipeline Timing

Records flow through: Kafka -> Accumulate (1000 records or 60s) -> Merkle tree -> IPFS pin -> Polygon anchor. Total anchoring latency is approximately 60-90 seconds after accumulation flush.

## Integration Patterns

### Pattern 1: Debezium CDC (Production)

```
PostgreSQL WAL (wal_level=logical)
  -> Debezium PostgreSQL connector reads via pgoutput plugin
    -> Publishes to Kafka topic (certyo.public.products)
      -> Consumer reads Kafka, maps to Certyo payload
        -> POST /api/v1/records (or /bulk)
          -> Verification worker updates source row with anchor status
```

### Pattern 2: LISTEN/NOTIFY (Lightweight)

```
PostgreSQL table with trigger
  -> INSERT/UPDATE/DELETE fires certyo_notify_change() trigger
    -> pg_notify('certyo_changes', payload) sent to channel
      -> Python/Node.js listener receives notification
        -> Maps payload to Certyo record
          -> POST /api/v1/records
```

### Pattern 3: Timestamp-Based Polling

```
PostgreSQL table with updated_at column
  -> Python script polls every N seconds with WHERE updated_at >= last_sync
    -> Maps each row to Certyo record payload
      -> POST /api/v1/records (or /bulk if > 10 rows)
        -> Persists last_sync_at in _certyo_sync_cursors table
```

## Authentication

### PostgreSQL Connection

```python
# Python (psycopg2)
PG_DSN = "host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"

# Python (SQLAlchemy)
PG_URL = "postgresql://certyo_cdc:secret@localhost:5432/productdb"
```

```javascript
// Node.js (pg)
const PG_CONFIG = {
  host: "localhost",
  port: 5432,
  database: "productdb",
  user: "certyo_cdc",
  password: "secret",
};
```

For managed services (AWS RDS, Azure Database for PostgreSQL):

```
postgresql://certyo_cdc:PASSWORD@mydb.abc123.us-east-1.rds.amazonaws.com:5432/productdb?sslmode=require
```

### Certyo API

```json
{
  "CERTYO_BASE_URL": "https://www.certyos.com",
  "CERTYO_API_KEY": "cty_xxxxxxxxxxxxxxxxxxxx",
  "CERTYO_TENANT_ID": "acme-corp"
}
```

Store the API key in environment variables or a secrets manager. Never commit credentials to source control.

## Field Mapping

| PostgreSQL Field | Certyo Field | Notes |
|-----------------|-------------|-------|
| Primary key (`id`) | `recordId` | Cast to string |
| Table name | `collection` | Fully qualified: `"public.products"` |
| Database name | `database` | e.g. `"productdb"` |
| Full row (JSON) | `recordPayload` | `row_to_json()` or dict serialization |
| `updated_at` | `sourceTimestamp` | ISO 8601 format |
| `TG_OP` / operation | `operationType` | INSERT->insert, UPDATE->update, DELETE->delete |
| `id` + timestamp | `idempotencyKey` | Ensures deduplication on retry |

### Operation Mapping

| PostgreSQL `TG_OP` | Certyo `operationType` |
|-------------------|----------------------|
| `INSERT` | `insert` |
| `UPDATE` | `update` |
| `DELETE` | `delete` |

## Database Setup

### LISTEN/NOTIFY Trigger

```sql
-- Create the notification trigger function
CREATE OR REPLACE FUNCTION certyo_notify_change()
RETURNS trigger AS $$
DECLARE
    payload JSON;
    record_row RECORD;
BEGIN
    IF TG_OP = 'DELETE' THEN
        record_row := OLD;
    ELSE
        record_row := NEW;
    END IF;

    payload := json_build_object(
        'operation', TG_OP,
        'schema', TG_TABLE_SCHEMA,
        'table', TG_TABLE_NAME,
        'record', row_to_json(record_row)
    );

    PERFORM pg_notify('certyo_changes', payload::text);
    RETURN record_row;
END;
$$ LANGUAGE plpgsql;

-- Attach to the products table
CREATE TRIGGER certyo_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.products
FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();
```

### Certyo Tracking Columns

```sql
-- Add tracking columns for verification write-back
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_record_hash TEXT;
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_anchor_status TEXT DEFAULT 'pending';
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_verified_at TIMESTAMPTZ;
```

### Debezium Prerequisites

```sql
-- wal_level=logical must be set in postgresql.conf (requires restart)
-- SHOW wal_level;

-- Create a publication for the tracked tables
CREATE PUBLICATION certyo_publication FOR TABLE public.products;

-- Grant replication privilege to the CDC user
ALTER ROLE certyo_cdc WITH REPLICATION;
```

### Polling Prerequisites

```sql
-- Ensure the table has an updated_at column with a trigger
ALTER TABLE public.products
ADD COLUMN IF NOT EXISTS updated_at TIMESTAMPTZ DEFAULT NOW();

CREATE OR REPLACE FUNCTION update_updated_at()
RETURNS trigger AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER set_updated_at
BEFORE UPDATE ON public.products
FOR EACH ROW EXECUTE FUNCTION update_updated_at();

-- Create the sync tracking table
CREATE TABLE IF NOT EXISTS _certyo_sync_cursors (
    table_name TEXT PRIMARY KEY,
    last_sync_at TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01T00:00:00Z',
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
```

## Code Examples

### 1. Python LISTEN/NOTIFY Listener

```python
"""
PostgreSQL LISTEN/NOTIFY listener for Certyo.
Creates a trigger that fires pg_notify, then listens and ingests.

Install: pip install psycopg2-binary requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_pg_listener.py
"""

import os
import json
import select
import psycopg2
import psycopg2.extensions
import requests
from datetime import datetime, timezone

PG_DSN = os.environ.get(
    "PG_DSN",
    "host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
CHANNEL = "certyo_changes"
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"


def map_to_record(notification_payload):
    data = json.loads(notification_payload)
    record_data = data["record"]
    op_map = {"insert": "insert", "update": "update", "delete": "delete"}
    certyo_op = op_map.get(data["operation"].lower(), "upsert")
    record_id = str(record_data.get("id", record_data.get("product_id", "unknown")))

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": DATABASE,
        "collection": f"{SCHEMA}.{TABLE}",
        "recordId": record_id,
        "recordVersion": "1",
        "operationType": certyo_op,
        "recordPayload": record_data,
        "sourceTimestamp": datetime.now(timezone.utc).isoformat(),
        "idempotencyKey": f"pg-notify-{TABLE}-{record_id}-"
            f"{datetime.now(timezone.utc).strftime('%Y-%m-%dT%H')}",
    }


def ingest_record(record):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    resp = requests.post(
        f"{CERTYO_BASE_URL}/api/v1/records",
        headers=headers,
        json=record,
    )
    if resp.ok:
        result = resp.json()
        print(f"  Ingested {record['recordId']}: hash={result['recordHash']}")
        return result
    else:
        print(f"  Failed {record['recordId']}: {resp.status_code} {resp.text}")
        return None


def listen():
    conn = psycopg2.connect(PG_DSN)
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

    with conn.cursor() as cur:
        cur.execute(f"LISTEN {CHANNEL};")
    print(f"Listening on channel '{CHANNEL}'...")

    while True:
        if select.select([conn], [], [], 30.0) == ([], [], []):
            continue

        conn.poll()
        while conn.notifies:
            notify = conn.notifies.pop(0)
            try:
                record = map_to_record(notify.payload)
                ingest_record(record)
            except Exception as e:
                print(f"Error processing notification: {e}")


if __name__ == "__main__":
    listen()
```

### 2. Node.js LISTEN/NOTIFY Listener

```javascript
/**
 * PostgreSQL LISTEN/NOTIFY listener for Certyo.
 *
 * Install: npm install pg
 * Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy node certyo-pg-listener.js
 */

const { Client } = require("pg");

const PG_CONFIG = {
  host: process.env.PG_HOST || "localhost",
  port: parseInt(process.env.PG_PORT || "5432"),
  database: process.env.PG_DATABASE || "productdb",
  user: process.env.PG_USER || "certyo_cdc",
  password: process.env.PG_PASSWORD || "secret",
};

const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";
const CHANNEL = "certyo_changes";
const SCHEMA = "public";
const TABLE = "products";

function mapToRecord(payload) {
  const data = JSON.parse(payload);
  const recordData = data.record;
  const opMap = { INSERT: "insert", UPDATE: "update", DELETE: "delete" };
  const operation = opMap[data.operation] || "upsert";
  const recordId = String(recordData.id || recordData.product_id || "unknown");

  return {
    tenantId: CERTYO_TENANT_ID,
    database: PG_CONFIG.database,
    collection: `${SCHEMA}.${TABLE}`,
    recordId,
    recordVersion: "1",
    operationType: operation,
    recordPayload: recordData,
    sourceTimestamp: new Date().toISOString(),
    idempotencyKey: `pg-notify-${TABLE}-${recordId}-${new Date().toISOString().substring(0, 13)}`,
  };
}

async function ingestRecord(record) {
  const response = await fetch(`${CERTYO_BASE_URL}/api/v1/records`, {
    method: "POST",
    headers: { "Content-Type": "application/json", "X-API-Key": CERTYO_API_KEY },
    body: JSON.stringify(record),
  });

  if (response.ok) {
    const result = await response.json();
    console.log(`  Ingested ${record.recordId}: hash=${result.recordHash}`);
    return result;
  } else {
    console.error(`  Failed ${record.recordId}: ${response.status}`);
    return null;
  }
}

async function main() {
  const client = new Client(PG_CONFIG);
  await client.connect();
  await client.query(`LISTEN ${CHANNEL}`);
  console.log(`Listening on '${CHANNEL}'...`);

  client.on("notification", async (msg) => {
    try {
      const record = mapToRecord(msg.payload);
      await ingestRecord(record);
    } catch (err) {
      console.error("Error:", err.message);
    }
  });

  process.on("SIGINT", async () => {
    await client.end();
    process.exit(0);
  });
}

main().catch(console.error);
```

### 3. Debezium PostgreSQL Connector

```yaml
# docker-compose.yml for Debezium PostgreSQL CDC
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  debezium:
    image: debezium/connect:2.5
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: certyo-pg-cdc
      CONFIG_STORAGE_TOPIC: _debezium_configs
      OFFSET_STORAGE_TOPIC: _debezium_offsets
      STATUS_STORAGE_TOPIC: _debezium_status
```

Register the connector:

```bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "certyo-pg-source",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "host.docker.internal",
      "database.port": "5432",
      "database.user": "certyo_cdc",
      "database.password": "secret",
      "database.dbname": "productdb",
      "topic.prefix": "certyo",
      "table.include.list": "public.products",
      "plugin.name": "pgoutput",
      "slot.name": "certyo_slot",
      "publication.name": "certyo_publication",
      "snapshot.mode": "initial"
    }
  }'
```

### 4. Python Batch Polling with SQLAlchemy

```python
"""
PostgreSQL polling worker for Certyo using SQLAlchemy.

Install: pip install sqlalchemy psycopg2-binary requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_pg_poll.py
"""

import os
import time
import requests
from datetime import datetime, timezone
from sqlalchemy import create_engine, text

PG_URL = os.environ.get("PG_URL", "postgresql://certyo_cdc:secret@localhost:5432/productdb")
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
BULK_THRESHOLD = 10

engine = create_engine(PG_URL)


def ensure_tracking_table():
    with engine.begin() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS _certyo_sync_cursors (
                table_name TEXT PRIMARY KEY,
                last_sync_at TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01T00:00:00Z',
                updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
            );
        """))


def get_last_sync():
    with engine.connect() as conn:
        result = conn.execute(text(
            "SELECT last_sync_at FROM _certyo_sync_cursors WHERE table_name = :table"
        ), {"table": f"{SCHEMA}.{TABLE}"})
        row = result.fetchone()
        return row[0] if row else datetime(2000, 1, 1, tzinfo=timezone.utc)


def save_last_sync(sync_time):
    with engine.begin() as conn:
        conn.execute(text("""
            INSERT INTO _certyo_sync_cursors (table_name, last_sync_at, updated_at)
            VALUES (:table, :sync_at, NOW())
            ON CONFLICT (table_name) DO UPDATE SET
                last_sync_at = :sync_at, updated_at = NOW();
        """), {"table": f"{SCHEMA}.{TABLE}", "sync_at": sync_time})


def map_row(row_dict):
    record_id = str(row_dict.get("id", row_dict.get("product_id", "unknown")))
    payload = {}
    for key, value in row_dict.items():
        if isinstance(value, datetime):
            payload[key] = value.isoformat()
        else:
            payload[key] = value

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": DATABASE,
        "collection": f"{SCHEMA}.{TABLE}",
        "recordId": record_id,
        "recordVersion": "1",
        "operationType": "upsert",
        "recordPayload": payload,
        "sourceTimestamp": row_dict.get("updated_at", datetime.now(timezone.utc)).isoformat()
            if isinstance(row_dict.get("updated_at"), datetime)
            else datetime.now(timezone.utc).isoformat(),
        "idempotencyKey": f"pg-poll-{TABLE}-{record_id}-"
            f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
    }


def ingest_records(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}

    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                headers=headers,
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
            )
            if resp.ok:
                print(f"  Bulk ingested {len(batch)} records")
            else:
                print(f"  Bulk failed: {resp.status_code} {resp.text}")
    else:
        for record in records:
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                headers=headers,
                json=record,
            )
            if resp.ok:
                result = resp.json()
                print(f"  Ingested {record['recordId']}: {result['recordHash']}")
            else:
                print(f"  Failed {record['recordId']}: {resp.status_code}")


def poll_cycle():
    last_sync = get_last_sync()
    now = datetime.now(timezone.utc)

    with engine.connect() as conn:
        result = conn.execute(text(f"""
            SELECT * FROM {SCHEMA}.{TABLE}
            WHERE updated_at >= :last_sync
            ORDER BY updated_at ASC LIMIT 5000
        """), {"last_sync": last_sync})
        columns = list(result.keys())
        rows = [dict(zip(columns, row)) for row in result.fetchall()]

    if not rows:
        return

    print(f"Found {len(rows)} modified rows since {last_sync.isoformat()}")
    records = [map_row(row) for row in rows]
    ingest_records(records)
    save_last_sync(now)


if __name__ == "__main__":
    ensure_tracking_table()
    print(f"PostgreSQL polling worker started (interval: {POLL_INTERVAL}s)")
    while True:
        try:
            poll_cycle()
        except Exception as e:
            print(f"Poll cycle error: {e}")
        time.sleep(POLL_INTERVAL)
```

### 5. Verification Worker

```python
"""
Verify anchored records and write back status to PostgreSQL.

Install: pip install psycopg2-binary requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python verify_pg.py
"""

import os
import psycopg2
import requests
from datetime import datetime, timezone

PG_DSN = os.environ.get(
    "PG_DSN",
    "host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"


def verify_and_update():
    conn = psycopg2.connect(PG_DSN)
    cur = conn.cursor()

    cur.execute(f"""
        SELECT id FROM {SCHEMA}.{TABLE}
        WHERE certyo_anchor_status != 'anchored'
           OR certyo_anchor_status IS NULL
        LIMIT 100
    """)
    rows = cur.fetchall()
    print(f"Verifying {len(rows)} records...")

    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}

    for (record_id,) in rows:
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/verify/record",
            headers=headers,
            json={
                "tenantId": CERTYO_TENANT_ID,
                "database": DATABASE,
                "collection": f"{SCHEMA}.{TABLE}",
                "recordId": str(record_id),
            },
        )

        if not resp.ok:
            print(f"  {record_id}: verification failed ({resp.status_code})")
            continue

        result = resp.json()
        if result.get("verified") and result.get("anchoredOnChain"):
            cur.execute(f"""
                UPDATE {SCHEMA}.{TABLE} SET
                    certyo_record_hash = %s,
                    certyo_anchor_status = 'anchored',
                    certyo_verified_at = %s
                WHERE id = %s
            """, (result["recordHash"], datetime.now(timezone.utc), record_id))
            conn.commit()
            print(f"  {record_id}: anchored (hash: {result['recordHash']})")
        else:
            print(f"  {record_id}: not yet anchored")

    cur.close()
    conn.close()


if __name__ == "__main__":
    verify_and_update()
```

## Verification & Write-back

### Verification Flow

1. A verification script runs periodically (e.g., every 2 minutes via cron)
2. Queries rows where `certyo_anchor_status` is not `'anchored'`
3. Calls `POST /api/v1/verify/record` with `tenantId`, `database`, `collection`, and `recordId`
4. On success, updates the source row:
   - `certyo_record_hash` = hash from verification response
   - `certyo_anchor_status` = `'anchored'`
   - `certyo_verified_at` = current timestamp

### Write-back SQL

```sql
UPDATE public.products SET
    certyo_record_hash   = 'sha256:ab12cd34...',
    certyo_anchor_status = 'anchored',
    certyo_verified_at   = NOW()
WHERE id = 42;
```

### Trigger and Write-back Interaction

When the verification worker updates `certyo_*` columns, the LISTEN/NOTIFY trigger will fire. To avoid an infinite loop, modify the trigger function to skip changes that only affect Certyo tracking columns:

```sql
CREATE OR REPLACE FUNCTION certyo_notify_change()
RETURNS trigger AS $$
DECLARE
    payload JSON;
    record_row RECORD;
BEGIN
    -- Skip if only certyo_ columns changed
    IF TG_OP = 'UPDATE' THEN
        IF OLD.certyo_record_hash IS DISTINCT FROM NEW.certyo_record_hash
           OR OLD.certyo_anchor_status IS DISTINCT FROM NEW.certyo_anchor_status
           OR OLD.certyo_verified_at IS DISTINCT FROM NEW.certyo_verified_at
        THEN
            -- Check if any non-certyo column also changed
            IF ROW(OLD.*) IS NOT DISTINCT FROM ROW(NEW.*) THEN
                RETURN NEW; -- Only certyo columns changed, skip notification
            END IF;
        END IF;
    END IF;

    IF TG_OP = 'DELETE' THEN
        record_row := OLD;
    ELSE
        record_row := NEW;
    END IF;

    payload := json_build_object(
        'operation', TG_OP,
        'schema', TG_TABLE_SCHEMA,
        'table', TG_TABLE_NAME,
        'record', row_to_json(record_row)
    );

    PERFORM pg_notify('certyo_changes', payload::text);
    RETURN record_row;
END;
$$ LANGUAGE plpgsql;
```

## Code Generation Rules

1. **Setting `wal_level=logical` requires a PostgreSQL restart.** This is required only for Debezium. LISTEN/NOTIFY and polling do not need it. Plan the change during a maintenance window. On managed services (AWS RDS, Azure), update the parameter group.

2. **Use the bulk endpoint (`POST /api/v1/records/bulk`) when the batch contains more than 10 records.** The bulk endpoint accepts up to 1000 records per call. For batches exceeding 1000, split into multiple bulk calls.

3. **LISTEN/NOTIFY payloads are limited to 8000 bytes.** For tables with wide rows, send only the primary key in the notification and have the listener fetch the full row before mapping to a Certyo record.

4. **LISTEN/NOTIFY notifications are lost if no client is listening.** Pair LISTEN/NOTIFY with periodic polling for gap recovery. Notifications are delivered only to connected clients that have executed `LISTEN channel`.

5. **For Debezium, use the `pgoutput` plugin.** It is included with PostgreSQL 10+ and does not require external extensions. The older `decoderbufs` and `wal2json` plugins work but are not recommended for new deployments.

6. **Monitor replication slots when using Debezium.** If the consumer is offline, the replication slot retains WAL segments, which can fill up disk. Query `pg_replication_slots` and drop stale slots to prevent disk exhaustion.

7. **Implement exponential backoff on Certyo HTTP 429 (Too Many Requests) responses.** Start with a 1-second delay, doubling on each retry up to 3 retries. Never retry immediately on 429.

8. **Exclude `certyo_*` columns from the `recordPayload`.** Columns like `certyo_record_hash`, `certyo_anchor_status`, and `certyo_verified_at` are metadata. Including them causes hash mismatches when these columns are updated by the verification worker.

9. **Construct idempotency keys from `recordId` + operation context.** For LISTEN/NOTIFY, use `pg-notify-{table}-{id}-{hour}`. For polling, use `pg-poll-{table}-{id}-{date}`. This ensures deduplication while allowing the same record to be re-ingested on different days.

10. **Polling cannot detect hard deletes.** Use soft deletes (`deleted_at` column) with polling, or switch to LISTEN/NOTIFY or Debezium for full delete coverage. The trigger-based approaches fire on DELETE and capture the old row.
