Skip to main contentCertyo Developer Portal

Integración con PostgreSQL

Usa CDC de PostgreSQL vía replicación lógica y Debezium, LISTEN/NOTIFY para eventos ligeros en tiempo real, o polling basado en marcas de tiempo. Esta guía cubre los tres enfoques para enviar cambios de filas a Certyo para anclaje en blockchain.

wal_level=logical requiere reinicio
Configurar wal_level=logical requiere un reinicio de PostgreSQL. Planifica este cambio durante una ventana de mantenimiento. En servicios gestionados como AWS RDS o Azure Database for PostgreSQL, esto se puede configurar vía grupos de parámetros sin reinicio manual.

Descripción general

PostgreSQL offers multiple change data capture strategies. For production workloads, Debezium with logical replication provides reliable, at-least-once delivery with full transaction ordering. For lightweight use cases, LISTEN/NOTIFY provides real-time push notifications using trigger functions. Timestamp-based polling is the simplest approach when neither Debezium nor triggers are practical.

Arquitectura

Data flow (Debezium)bash
┌──────────────────────┐       WAL logical decoding          ┌───────────────────────┐
│  PostgreSQL          │ ──── pgoutput plugin ──────────────> │  Debezium Connector   │
│  products table      │                                      │  (Kafka Connect)      │
│  (INSERT/UPDATE/DEL) │                                      └───────────┬───────────┘
└──────────────────────┘                                                  │
                                                               publishes to Kafka topic
                                                               certyo.public.products
                                                                          │
                                                                          v
                                                          ┌────────────────────────────┐
                                                          │  Consumer Process           │
                                                          │  (Python / Node.js)         │
                                                          │  reads Kafka, transforms,   │
                                                          │  POSTs to Certyo            │
                                                          └─────────────┬──────────────┘
                                                                        │
                                                          POST /api/v1/records (or /bulk)
                                                                        │
                                                                        v
                                                               ┌─────────────────┐
                                                               │  Certyo API     │
                                                               │  (202 Accepted) │
                                                               └────────┬────────┘
                                                                        │
                                                              ~60-90s anchoring
                                                                        │
                                                                        v
                                                               ┌─────────────────┐
                                                               │  Polygon        │
                                                               │  (on-chain)     │
                                                               └─────────────────┘

Requisitos previos

  • PostgreSQL 10+ with wal_level=logical (required for Debezium; not needed for LISTEN/NOTIFY or polling)
  • A user with REPLICATION privilege (for Debezium) or table TRIGGER privilege (for LISTEN/NOTIFY)
  • Python 3.9+ with psycopg2, or Node.js 18+ with the pg package
  • A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
  • For the Debezium approach: Docker and Docker Compose

Enfoque LISTEN/NOTIFY (ligero)

PostgreSQL's built-in LISTEN/NOTIFY mechanism lets a trigger function push notifications to connected clients in real time. No external infrastructure is required beyond a long-lived database connection.

certyo_pg_listener.py — Trigger + LISTEN/NOTIFY + Certyo ingestionpython
"""
PostgreSQL LISTEN/NOTIFY listener for Certyo.
Creates a trigger function that fires pg_notify on row changes,
then listens for notifications and ingests into Certyo.

Install: pip install psycopg2-binary requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_pg_listener.py
"""

import os
import json
import select
import psycopg2
import psycopg2.extensions
import requests
from datetime import datetime, timezone

PG_DSN = os.environ.get(
    "PG_DSN",
    "host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")

CHANNEL = "certyo_changes"
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"


def setup_trigger(conn):
    """Create the trigger function and trigger if they don't exist."""
    with conn.cursor() as cur:
        cur.execute(f"""
            CREATE OR REPLACE FUNCTION certyo_notify_change()
            RETURNS trigger AS $$
            DECLARE
                payload JSON;
                record_row RECORD;
            BEGIN
                IF TG_OP = 'DELETE' THEN
                    record_row := OLD;
                ELSE
                    record_row := NEW;
                END IF;

                payload := json_build_object(
                    'operation', TG_OP,
                    'schema', TG_TABLE_SCHEMA,
                    'table', TG_TABLE_NAME,
                    'record', row_to_json(record_row)
                );

                PERFORM pg_notify('{CHANNEL}', payload::text);
                RETURN record_row;
            END;
            $$ LANGUAGE plpgsql;
        """)

        cur.execute(f"""
            DO $$
            BEGIN
                IF NOT EXISTS (
                    SELECT 1 FROM pg_trigger
                    WHERE tgname = 'certyo_change_trigger'
                    AND tgrelid = '{SCHEMA}.{TABLE}'::regclass
                ) THEN
                    CREATE TRIGGER certyo_change_trigger
                    AFTER INSERT OR UPDATE OR DELETE ON {SCHEMA}.{TABLE}
                    FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();
                END IF;
            END $$;
        """)
        conn.commit()
        print(f"Trigger installed on {SCHEMA}.{TABLE}")


def map_to_record(notification_payload):
    """Map a pg_notify payload to a Certyo record."""
    data = json.loads(notification_payload)
    record_data = data["record"]
    operation = data["operation"].lower()

    # Map PostgreSQL operation names to Certyo
    op_map = {"insert": "insert", "update": "update", "delete": "delete"}
    certyo_op = op_map.get(operation, "upsert")

    record_id = str(record_data.get("id", record_data.get("product_id", "unknown")))

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": DATABASE,
        "collection": f"{SCHEMA}.{TABLE}",
        "recordId": record_id,
        "recordVersion": "1",
        "operationType": certyo_op,
        "recordPayload": record_data,
        "sourceTimestamp": datetime.now(timezone.utc).isoformat(),
        "idempotencyKey": f"pg-notify-{TABLE}-{record_id}-"
            f"{datetime.now(timezone.utc).strftime('%Y-%m-%dT%H')}",
    }


def ingest_record(record):
    """Send a single record to Certyo."""
    headers = {
        "X-API-Key": CERTYO_API_KEY,
        "Content-Type": "application/json",
    }
    resp = requests.post(
        f"{CERTYO_BASE_URL}/api/v1/records",
        headers=headers,
        json=record,
    )
    if resp.ok:
        result = resp.json()
        print(f"  Ingested {record['recordId']}: hash={result['recordHash']}")
        return result
    else:
        print(f"  Failed {record['recordId']}: {resp.status_code} {resp.text}")
        return None


def listen():
    """Main listener loop."""
    conn = psycopg2.connect(PG_DSN)
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

    setup_trigger(conn)

    with conn.cursor() as cur:
        cur.execute(f"LISTEN {CHANNEL};")
    print(f"Listening on channel '{CHANNEL}'...")

    while True:
        # Wait for notification with 30s timeout
        if select.select([conn], [], [], 30.0) == ([], [], []):
            continue

        conn.poll()
        while conn.notifies:
            notify = conn.notifies.pop(0)
            try:
                record = map_to_record(notify.payload)
                ingest_record(record)
            except Exception as e:
                print(f"Error processing notification: {e}")


if __name__ == "__main__":
    print("PostgreSQL LISTEN/NOTIFY worker starting...")
    listen()

Configuración de la base de datos

Before using the LISTEN/NOTIFY approach, ensure the trigger function is installed. The scripts above create the trigger automatically, but you can also install it manually:

SQL — Trigger function and Certyo tracking columnsbash
-- Create the notification trigger function
CREATE OR REPLACE FUNCTION certyo_notify_change()
RETURNS trigger AS $$
DECLARE
    payload JSON;
    record_row RECORD;
BEGIN
    IF TG_OP = 'DELETE' THEN
        record_row := OLD;
    ELSE
        record_row := NEW;
    END IF;

    payload := json_build_object(
        'operation', TG_OP,
        'schema', TG_TABLE_SCHEMA,
        'table', TG_TABLE_NAME,
        'record', row_to_json(record_row)
    );

    PERFORM pg_notify('certyo_changes', payload::text);
    RETURN record_row;
END;
$$ LANGUAGE plpgsql;

-- Attach to the products table
CREATE TRIGGER certyo_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.products
FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();

-- Add Certyo tracking columns (for verification write-back)
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_record_hash TEXT;
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_anchor_status TEXT DEFAULT 'pending';
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_verified_at TIMESTAMPTZ;

-- For Debezium: enable logical replication
-- (requires wal_level=logical in postgresql.conf and a restart)
CREATE PUBLICATION certyo_publication FOR TABLE public.products;

Mapeo de campos

Map PostgreSQL columns to the Certyo record schema:

PostgreSQL → Certyo field mappingbash
PostgreSQL Field          Certyo Field       Notes
──────────────────────   ─────────────────  ────────────────────────────────────
Primary key (id)         recordId           Cast to string
Table name               collection         e.g. "public.products" (schema.table)
Database name            database           e.g. "productdb"
Full row (JSON)          recordPayload      row_to_json() or dict serialization
updated_at               sourceTimestamp     ISO 8601 format
TG_OP / operation        operationType      INSERT→insert, UPDATE→update, DELETE→delete
id + timestamp           idempotencyKey     Ensures deduplication on retry

Verificación

After anchoring completes (~60-90 seconds), verify the record and write the status back to the source row using SQL UPDATE. This creates an audit trail directly in PostgreSQL.

verify_and_update.py — Verify anchoring and update source rowpython
"""
Verify anchored records and write back status to PostgreSQL.

Install: pip install psycopg2-binary requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python verify_and_update.py
"""

import os
import psycopg2
import requests
from datetime import datetime, timezone

PG_DSN = os.environ.get(
    "PG_DSN",
    "host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")

SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"


def verify_and_update():
    conn = psycopg2.connect(PG_DSN)
    cur = conn.cursor()

    # Find rows that need verification
    cur.execute(f"""
        SELECT id FROM {SCHEMA}.{TABLE}
        WHERE certyo_anchor_status != 'anchored'
           OR certyo_anchor_status IS NULL
        LIMIT 100
    """)
    rows = cur.fetchall()
    print(f"Verifying {len(rows)} records...")

    headers = {
        "X-API-Key": CERTYO_API_KEY,
        "Content-Type": "application/json",
    }

    for (record_id,) in rows:
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/verify/record",
            headers=headers,
            json={
                "tenantId": CERTYO_TENANT_ID,
                "database": DATABASE,
                "collection": f"{SCHEMA}.{TABLE}",
                "recordId": str(record_id),
            },
        )

        if not resp.ok:
            print(f"  {record_id}: verification request failed ({resp.status_code})")
            continue

        result = resp.json()

        if result.get("verified") and result.get("anchoredOnChain"):
            cur.execute(f"""
                UPDATE {SCHEMA}.{TABLE} SET
                    certyo_record_hash = %s,
                    certyo_anchor_status = 'anchored',
                    certyo_verified_at = %s
                WHERE id = %s
            """, (result["recordHash"], datetime.now(timezone.utc), record_id))
            conn.commit()
            print(f"  {record_id}: anchored (hash: {result['recordHash']})")
        else:
            print(f"  {record_id}: not yet anchored")

    cur.close()
    conn.close()


if __name__ == "__main__":
    verify_and_update()

Elegir un enfoque

  • Debezium (production) — Best for high-throughput production workloads. Provides reliable, ordered delivery with automatic offset tracking. Requires Kafka infrastructure.
  • LISTEN/NOTIFY (lightweight) — Best for low-to-medium throughput when you want real-time push without external infrastructure. Notifications are lost if no client is listening, so pair with periodic polling for gap recovery.
  • Polling (simplest) — Best for getting started quickly or when CDC infrastructure is not available. Requires an updated_at column on tracked tables. Cannot detect hard deletes.
Límite de payload de LISTEN/NOTIFY
Los payloads de NOTIFY de PostgreSQL están limitados a 8000 bytes. Para tablas con filas anchas, envía solo la clave primaria en la notificación y haz que el listener obtenga la fila completa antes de mapear a un registro Certyo.

Optimización masiva

For tables with high write throughput, batch records before calling the API:

  • POST /api/v1/records/bulk accepts up to 1000 records per request
  • The polling approach naturally batches all rows found in each cycle
  • For LISTEN/NOTIFY, buffer notifications for a short window (e.g., 5 seconds) before sending a bulk request
  • For Debezium, configure the Kafka consumer with a batch listener that accumulates messages

Solución de problemas

  • "ERROR: logical decoding requires wal_level >= logical" — Set wal_level = logical in postgresql.conf and restart PostgreSQL. On AWS RDS, modify the parameter group. On Azure, update the server parameter.
  • LISTEN/NOTIFY notifications lost — Notifications are only delivered to connected, listening clients. If the listener was disconnected, run a polling cycle to catch up on missed changes.
  • Replication slot growing — If the Debezium consumer is offline, the replication slot retains WAL segments. Monitor pg_replication_slots and drop stale slots to prevent disk filling up.
  • Polling misses deletes — Timestamp-based polling cannot detect hard deletes. Use soft deletes (deleted_at column) or switch to LISTEN/NOTIFY or Debezium for full delete coverage.
  • Duplicate records in Certyo — The idempotencyKey prevents true duplicates. Ensure your key includes enough specificity (record ID + timestamp or LSN) to distinguish different operations on the same row.

AI Integration · v1.0.0

AI Integration Skill

Download a skill file that enables AI agents to generate working PostgreSQL + Certyo integration code for any language or framework.

v1.0.0
What is this?
A markdown file containing PostgreSQL-specific field mappings, authentication setup, code examples, and integration patterns for Certyo. Drop it into your AI agent's context and ask it to generate integration code.

What's inside

  • AutenticaciónConexión PostgreSQL y configuración de clave API de Certyo
  • ArquitecturaDebezium CDC, LISTEN/NOTIFY y patrones de polling
  • Mapeo de camposClave primaria, tabla y esquema al esquema de registro Certyo
  • Ejemplos de códigoPython psycopg2, Node.js pg, Debezium, polling con SQLAlchemy
  • VerificaciónEscritura de vuelta con SQL UPDATE y columnas de estado de anclaje
  • PatronesLISTEN/NOTIFY en tiempo real, Debezium CDC y polling por lotes

How to use

Claude Code

Place the file in your project's .claude/commands/ directory, then use it as a slash command:

# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-postgresql.md \
  https://www.certyos.com/developers/skills/certyo-postgresql-skill.md

# Use it in Claude Code
/certyo-postgresql "Genera un listener Python que use LISTEN/NOTIFY para enviar cambios de filas a Certyo"

Cursor / Copilot / Any AI Agent

Add the file to your project root or attach it to a conversation. The AI agent will use the PostgreSQL-specific patterns, field mappings, and code examples to generate correct integration code.

# Add to your project
curl -o CERTYO_POSTGRESQL.md \
  https://www.certyos.com/developers/skills/certyo-postgresql-skill.md

# Then in your AI agent:
"Using the Certyo PostgreSQL spec in CERTYO_POSTGRESQL.md,
 genera un listener python que use listen/notify para enviar cambios de filas a certyo"

CLAUDE.md Context File

Append the skill file to your project's CLAUDE.md so every Claude conversation has PostgreSQL + Certyo context automatically.

# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo PostgreSQL Integration" >> CLAUDE.md
cat CERTYO_POSTGRESQL.md >> CLAUDE.md