Skip to main contentCertyo Developer Portal

PostgreSQL Integration

Use PostgreSQL CDC via logical replication and Debezium, LISTEN/NOTIFY for lightweight real-time events, or timestamp-based polling. This guide covers all three approaches for pushing row changes to Certyo for blockchain anchoring.

wal_level=logical requires restart
Setting wal_level=logical requires a PostgreSQL restart. Plan this change during a maintenance window. On managed services like AWS RDS or Azure Database for PostgreSQL, this can be set via parameter groups without manual restart.

Overview

PostgreSQL offers multiple change data capture strategies. For production workloads, Debezium with logical replication provides reliable, at-least-once delivery with full transaction ordering. For lightweight use cases, LISTEN/NOTIFY provides real-time push notifications using trigger functions. Timestamp-based polling is the simplest approach when neither Debezium nor triggers are practical.

Architecture

Data flow (Debezium)bash
┌──────────────────────┐       WAL logical decoding          ┌───────────────────────┐
│  PostgreSQL          │ ──── pgoutput plugin ──────────────> │  Debezium Connector   │
│  products table      │                                      │  (Kafka Connect)      │
│  (INSERT/UPDATE/DEL) │                                      └───────────┬───────────┘
└──────────────────────┘                                                  │
                                                               publishes to Kafka topic
                                                               certyo.public.products
                                                                          │
                                                                          v
                                                          ┌────────────────────────────┐
                                                          │  Consumer Process           │
                                                          │  (Python / Node.js)         │
                                                          │  reads Kafka, transforms,   │
                                                          │  POSTs to Certyo            │
                                                          └─────────────┬──────────────┘
                                                                        │
                                                          POST /api/v1/records (or /bulk)
                                                                        │
                                                                        v
                                                               ┌─────────────────┐
                                                               │  Certyo API     │
                                                               │  (202 Accepted) │
                                                               └────────┬────────┘
                                                                        │
                                                              ~60-90s anchoring
                                                                        │
                                                                        v
                                                               ┌─────────────────┐
                                                               │  Polygon        │
                                                               │  (on-chain)     │
                                                               └─────────────────┘

Prerequisites

  • PostgreSQL 10+ with wal_level=logical (required for Debezium; not needed for LISTEN/NOTIFY or polling)
  • A user with REPLICATION privilege (for Debezium) or table TRIGGER privilege (for LISTEN/NOTIFY)
  • Python 3.9+ with psycopg2, or Node.js 18+ with the pg package
  • A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
  • For the Debezium approach: Docker and Docker Compose

LISTEN/NOTIFY approach (lightweight)

PostgreSQL's built-in LISTEN/NOTIFY mechanism lets a trigger function push notifications to connected clients in real time. No external infrastructure is required beyond a long-lived database connection.

certyo_pg_listener.py — Trigger + LISTEN/NOTIFY + Certyo ingestionpython
"""
PostgreSQL LISTEN/NOTIFY listener for Certyo.
Creates a trigger function that fires pg_notify on row changes,
then listens for notifications and ingests into Certyo.

Install: pip install psycopg2-binary requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_pg_listener.py
"""

import os
import json
import select
import psycopg2
import psycopg2.extensions
import requests
from datetime import datetime, timezone

PG_DSN = os.environ.get(
    "PG_DSN",
    "host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")

CHANNEL = "certyo_changes"
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"


def setup_trigger(conn):
    """Create the trigger function and trigger if they don't exist."""
    with conn.cursor() as cur:
        cur.execute(f"""
            CREATE OR REPLACE FUNCTION certyo_notify_change()
            RETURNS trigger AS $$
            DECLARE
                payload JSON;
                record_row RECORD;
            BEGIN
                IF TG_OP = 'DELETE' THEN
                    record_row := OLD;
                ELSE
                    record_row := NEW;
                END IF;

                payload := json_build_object(
                    'operation', TG_OP,
                    'schema', TG_TABLE_SCHEMA,
                    'table', TG_TABLE_NAME,
                    'record', row_to_json(record_row)
                );

                PERFORM pg_notify('{CHANNEL}', payload::text);
                RETURN record_row;
            END;
            $$ LANGUAGE plpgsql;
        """)

        cur.execute(f"""
            DO $$
            BEGIN
                IF NOT EXISTS (
                    SELECT 1 FROM pg_trigger
                    WHERE tgname = 'certyo_change_trigger'
                    AND tgrelid = '{SCHEMA}.{TABLE}'::regclass
                ) THEN
                    CREATE TRIGGER certyo_change_trigger
                    AFTER INSERT OR UPDATE OR DELETE ON {SCHEMA}.{TABLE}
                    FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();
                END IF;
            END $$;
        """)
        conn.commit()
        print(f"Trigger installed on {SCHEMA}.{TABLE}")


def map_to_record(notification_payload):
    """Map a pg_notify payload to a Certyo record."""
    data = json.loads(notification_payload)
    record_data = data["record"]
    operation = data["operation"].lower()

    # Map PostgreSQL operation names to Certyo
    op_map = {"insert": "insert", "update": "update", "delete": "delete"}
    certyo_op = op_map.get(operation, "upsert")

    record_id = str(record_data.get("id", record_data.get("product_id", "unknown")))

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": DATABASE,
        "collection": f"{SCHEMA}.{TABLE}",
        "recordId": record_id,
        "recordVersion": "1",
        "operationType": certyo_op,
        "recordPayload": record_data,
        "sourceTimestamp": datetime.now(timezone.utc).isoformat(),
        "idempotencyKey": f"pg-notify-{TABLE}-{record_id}-"
            f"{datetime.now(timezone.utc).strftime('%Y-%m-%dT%H')}",
    }


def ingest_record(record):
    """Send a single record to Certyo."""
    headers = {
        "X-API-Key": CERTYO_API_KEY,
        "Content-Type": "application/json",
    }
    resp = requests.post(
        f"{CERTYO_BASE_URL}/api/v1/records",
        headers=headers,
        json=record,
    )
    if resp.ok:
        result = resp.json()
        print(f"  Ingested {record['recordId']}: hash={result['recordHash']}")
        return result
    else:
        print(f"  Failed {record['recordId']}: {resp.status_code} {resp.text}")
        return None


def listen():
    """Main listener loop."""
    conn = psycopg2.connect(PG_DSN)
    conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

    setup_trigger(conn)

    with conn.cursor() as cur:
        cur.execute(f"LISTEN {CHANNEL};")
    print(f"Listening on channel '{CHANNEL}'...")

    while True:
        # Wait for notification with 30s timeout
        if select.select([conn], [], [], 30.0) == ([], [], []):
            continue

        conn.poll()
        while conn.notifies:
            notify = conn.notifies.pop(0)
            try:
                record = map_to_record(notify.payload)
                ingest_record(record)
            except Exception as e:
                print(f"Error processing notification: {e}")


if __name__ == "__main__":
    print("PostgreSQL LISTEN/NOTIFY worker starting...")
    listen()

Database setup

Before using the LISTEN/NOTIFY approach, ensure the trigger function is installed. The scripts above create the trigger automatically, but you can also install it manually:

SQL — Trigger function and Certyo tracking columnsbash
-- Create the notification trigger function
CREATE OR REPLACE FUNCTION certyo_notify_change()
RETURNS trigger AS $$
DECLARE
    payload JSON;
    record_row RECORD;
BEGIN
    IF TG_OP = 'DELETE' THEN
        record_row := OLD;
    ELSE
        record_row := NEW;
    END IF;

    payload := json_build_object(
        'operation', TG_OP,
        'schema', TG_TABLE_SCHEMA,
        'table', TG_TABLE_NAME,
        'record', row_to_json(record_row)
    );

    PERFORM pg_notify('certyo_changes', payload::text);
    RETURN record_row;
END;
$$ LANGUAGE plpgsql;

-- Attach to the products table
CREATE TRIGGER certyo_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.products
FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();

-- Add Certyo tracking columns (for verification write-back)
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_record_hash TEXT;
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_anchor_status TEXT DEFAULT 'pending';
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_verified_at TIMESTAMPTZ;

-- For Debezium: enable logical replication
-- (requires wal_level=logical in postgresql.conf and a restart)
CREATE PUBLICATION certyo_publication FOR TABLE public.products;

Field mapping

Map PostgreSQL columns to the Certyo record schema:

PostgreSQL → Certyo field mappingbash
PostgreSQL Field          Certyo Field       Notes
──────────────────────   ─────────────────  ────────────────────────────────────
Primary key (id)         recordId           Cast to string
Table name               collection         e.g. "public.products" (schema.table)
Database name            database           e.g. "productdb"
Full row (JSON)          recordPayload      row_to_json() or dict serialization
updated_at               sourceTimestamp     ISO 8601 format
TG_OP / operation        operationType      INSERT→insert, UPDATE→update, DELETE→delete
id + timestamp           idempotencyKey     Ensures deduplication on retry

Verification

After anchoring completes (~60-90 seconds), verify the record and write the status back to the source row using SQL UPDATE. This creates an audit trail directly in PostgreSQL.

verify_and_update.py — Verify anchoring and update source rowpython
"""
Verify anchored records and write back status to PostgreSQL.

Install: pip install psycopg2-binary requests
Usage:   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python verify_and_update.py
"""

import os
import psycopg2
import requests
from datetime import datetime, timezone

PG_DSN = os.environ.get(
    "PG_DSN",
    "host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")

SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"


def verify_and_update():
    conn = psycopg2.connect(PG_DSN)
    cur = conn.cursor()

    # Find rows that need verification
    cur.execute(f"""
        SELECT id FROM {SCHEMA}.{TABLE}
        WHERE certyo_anchor_status != 'anchored'
           OR certyo_anchor_status IS NULL
        LIMIT 100
    """)
    rows = cur.fetchall()
    print(f"Verifying {len(rows)} records...")

    headers = {
        "X-API-Key": CERTYO_API_KEY,
        "Content-Type": "application/json",
    }

    for (record_id,) in rows:
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/verify/record",
            headers=headers,
            json={
                "tenantId": CERTYO_TENANT_ID,
                "database": DATABASE,
                "collection": f"{SCHEMA}.{TABLE}",
                "recordId": str(record_id),
            },
        )

        if not resp.ok:
            print(f"  {record_id}: verification request failed ({resp.status_code})")
            continue

        result = resp.json()

        if result.get("verified") and result.get("anchoredOnChain"):
            cur.execute(f"""
                UPDATE {SCHEMA}.{TABLE} SET
                    certyo_record_hash = %s,
                    certyo_anchor_status = 'anchored',
                    certyo_verified_at = %s
                WHERE id = %s
            """, (result["recordHash"], datetime.now(timezone.utc), record_id))
            conn.commit()
            print(f"  {record_id}: anchored (hash: {result['recordHash']})")
        else:
            print(f"  {record_id}: not yet anchored")

    cur.close()
    conn.close()


if __name__ == "__main__":
    verify_and_update()

Choosing an approach

  • Debezium (production) — Best for high-throughput production workloads. Provides reliable, ordered delivery with automatic offset tracking. Requires Kafka infrastructure.
  • LISTEN/NOTIFY (lightweight) — Best for low-to-medium throughput when you want real-time push without external infrastructure. Notifications are lost if no client is listening, so pair with periodic polling for gap recovery.
  • Polling (simplest) — Best for getting started quickly or when CDC infrastructure is not available. Requires an updated_at column on tracked tables. Cannot detect hard deletes.
LISTEN/NOTIFY payload limit
PostgreSQL NOTIFY payloads are limited to 8000 bytes. For tables with wide rows, send only the primary key in the notification and have the listener fetch the full row before mapping to a Certyo record.

Bulk optimization

For tables with high write throughput, batch records before calling the API:

  • POST /api/v1/records/bulk accepts up to 1000 records per request
  • The polling approach naturally batches all rows found in each cycle
  • For LISTEN/NOTIFY, buffer notifications for a short window (e.g., 5 seconds) before sending a bulk request
  • For Debezium, configure the Kafka consumer with a batch listener that accumulates messages

Troubleshooting

  • "ERROR: logical decoding requires wal_level >= logical" — Set wal_level = logical in postgresql.conf and restart PostgreSQL. On AWS RDS, modify the parameter group. On Azure, update the server parameter.
  • LISTEN/NOTIFY notifications lost — Notifications are only delivered to connected, listening clients. If the listener was disconnected, run a polling cycle to catch up on missed changes.
  • Replication slot growing — If the Debezium consumer is offline, the replication slot retains WAL segments. Monitor pg_replication_slots and drop stale slots to prevent disk filling up.
  • Polling misses deletes — Timestamp-based polling cannot detect hard deletes. Use soft deletes (deleted_at column) or switch to LISTEN/NOTIFY or Debezium for full delete coverage.
  • Duplicate records in Certyo — The idempotencyKey prevents true duplicates. Ensure your key includes enough specificity (record ID + timestamp or LSN) to distinguish different operations on the same row.

AI Integration · v1.0.0

AI Integration Skill

Download a skill file that enables AI agents to generate working PostgreSQL + Certyo integration code for any language or framework.

v1.0.0
What is this?
A markdown file containing PostgreSQL-specific field mappings, authentication setup, code examples, and integration patterns for Certyo. Drop it into your AI agent's context and ask it to generate integration code.

What's inside

  • AuthenticationPostgreSQL connection and Certyo API key configuration
  • ArchitectureDebezium CDC, LISTEN/NOTIFY, and polling patterns
  • Field mappingPrimary key, table, schema to Certyo record schema
  • Code examplesPython psycopg2, Node.js pg, Debezium, SQLAlchemy polling
  • VerificationSQL UPDATE write-back with anchor status columns
  • PatternsReal-time LISTEN/NOTIFY, Debezium CDC, and batch polling

How to use

Claude Code

Place the file in your project's .claude/commands/ directory, then use it as a slash command:

# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-postgresql.md \
  https://www.certyos.com/developers/skills/certyo-postgresql-skill.md

# Use it in Claude Code
/certyo-postgresql "Generate a Python listener that uses LISTEN/NOTIFY to push row changes to Certyo"

Cursor / Copilot / Any AI Agent

Add the file to your project root or attach it to a conversation. The AI agent will use the PostgreSQL-specific patterns, field mappings, and code examples to generate correct integration code.

# Add to your project
curl -o CERTYO_POSTGRESQL.md \
  https://www.certyos.com/developers/skills/certyo-postgresql-skill.md

# Then in your AI agent:
"Using the Certyo PostgreSQL spec in CERTYO_POSTGRESQL.md,
 generate a python listener that uses listen/notify to push row changes to certyo"

CLAUDE.md Context File

Append the skill file to your project's CLAUDE.md so every Claude conversation has PostgreSQL + Certyo context automatically.

# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo PostgreSQL Integration" >> CLAUDE.md
cat CERTYO_POSTGRESQL.md >> CLAUDE.md