PostgreSQL Integration
Use PostgreSQL CDC via logical replication and Debezium, LISTEN/NOTIFY for lightweight real-time events, or timestamp-based polling. This guide covers all three approaches for pushing row changes to Certyo for blockchain anchoring.
Overview
PostgreSQL offers multiple change data capture strategies. For production workloads, Debezium with logical replication provides reliable, at-least-once delivery with full transaction ordering. For lightweight use cases, LISTEN/NOTIFY provides real-time push notifications using trigger functions. Timestamp-based polling is the simplest approach when neither Debezium nor triggers are practical.
Architecture
┌──────────────────────┐ WAL logical decoding ┌───────────────────────┐
│ PostgreSQL │ ──── pgoutput plugin ──────────────> │ Debezium Connector │
│ products table │ │ (Kafka Connect) │
│ (INSERT/UPDATE/DEL) │ └───────────┬───────────┘
└──────────────────────┘ │
publishes to Kafka topic
certyo.public.products
│
v
┌────────────────────────────┐
│ Consumer Process │
│ (Python / Node.js) │
│ reads Kafka, transforms, │
│ POSTs to Certyo │
└─────────────┬──────────────┘
│
POST /api/v1/records (or /bulk)
│
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└────────┬────────┘
│
~60-90s anchoring
│
v
┌─────────────────┐
│ Polygon │
│ (on-chain) │
└─────────────────┘Prerequisites
- PostgreSQL 10+ with
wal_level=logical(required for Debezium; not needed for LISTEN/NOTIFY or polling) - A user with
REPLICATIONprivilege (for Debezium) or tableTRIGGERprivilege (for LISTEN/NOTIFY) - Python 3.9+ with
psycopg2, or Node.js 18+ with thepgpackage - A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
- For the Debezium approach: Docker and Docker Compose
LISTEN/NOTIFY approach (lightweight)
PostgreSQL's built-in LISTEN/NOTIFY mechanism lets a trigger function push notifications to connected clients in real time. No external infrastructure is required beyond a long-lived database connection.
"""
PostgreSQL LISTEN/NOTIFY listener for Certyo.
Creates a trigger function that fires pg_notify on row changes,
then listens for notifications and ingests into Certyo.
Install: pip install psycopg2-binary requests
Usage: CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_pg_listener.py
"""
import os
import json
import select
import psycopg2
import psycopg2.extensions
import requests
from datetime import datetime, timezone
PG_DSN = os.environ.get(
"PG_DSN",
"host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
CHANNEL = "certyo_changes"
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"
def setup_trigger(conn):
"""Create the trigger function and trigger if they don't exist."""
with conn.cursor() as cur:
cur.execute(f"""
CREATE OR REPLACE FUNCTION certyo_notify_change()
RETURNS trigger AS $$
DECLARE
payload JSON;
record_row RECORD;
BEGIN
IF TG_OP = 'DELETE' THEN
record_row := OLD;
ELSE
record_row := NEW;
END IF;
payload := json_build_object(
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'record', row_to_json(record_row)
);
PERFORM pg_notify('{CHANNEL}', payload::text);
RETURN record_row;
END;
$$ LANGUAGE plpgsql;
""")
cur.execute(f"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgname = 'certyo_change_trigger'
AND tgrelid = '{SCHEMA}.{TABLE}'::regclass
) THEN
CREATE TRIGGER certyo_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON {SCHEMA}.{TABLE}
FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();
END IF;
END $$;
""")
conn.commit()
print(f"Trigger installed on {SCHEMA}.{TABLE}")
def map_to_record(notification_payload):
"""Map a pg_notify payload to a Certyo record."""
data = json.loads(notification_payload)
record_data = data["record"]
operation = data["operation"].lower()
# Map PostgreSQL operation names to Certyo
op_map = {"insert": "insert", "update": "update", "delete": "delete"}
certyo_op = op_map.get(operation, "upsert")
record_id = str(record_data.get("id", record_data.get("product_id", "unknown")))
return {
"tenantId": CERTYO_TENANT_ID,
"database": DATABASE,
"collection": f"{SCHEMA}.{TABLE}",
"recordId": record_id,
"recordVersion": "1",
"operationType": certyo_op,
"recordPayload": record_data,
"sourceTimestamp": datetime.now(timezone.utc).isoformat(),
"idempotencyKey": f"pg-notify-{TABLE}-{record_id}-"
f"{datetime.now(timezone.utc).strftime('%Y-%m-%dT%H')}",
}
def ingest_record(record):
"""Send a single record to Certyo."""
headers = {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
}
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
headers=headers,
json=record,
)
if resp.ok:
result = resp.json()
print(f" Ingested {record['recordId']}: hash={result['recordHash']}")
return result
else:
print(f" Failed {record['recordId']}: {resp.status_code} {resp.text}")
return None
def listen():
"""Main listener loop."""
conn = psycopg2.connect(PG_DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
setup_trigger(conn)
with conn.cursor() as cur:
cur.execute(f"LISTEN {CHANNEL};")
print(f"Listening on channel '{CHANNEL}'...")
while True:
# Wait for notification with 30s timeout
if select.select([conn], [], [], 30.0) == ([], [], []):
continue
conn.poll()
while conn.notifies:
notify = conn.notifies.pop(0)
try:
record = map_to_record(notify.payload)
ingest_record(record)
except Exception as e:
print(f"Error processing notification: {e}")
if __name__ == "__main__":
print("PostgreSQL LISTEN/NOTIFY worker starting...")
listen()/**
* PostgreSQL LISTEN/NOTIFY listener for Certyo.
* Listens for row-change notifications from a trigger function
* and ingests them into Certyo for blockchain anchoring.
*
* Install: npm install pg
* Usage: CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy node certyo-pg-listener.js
*/
const { Client } = require("pg");
const PG_CONFIG = {
host: process.env.PG_HOST || "localhost",
port: parseInt(process.env.PG_PORT || "5432"),
database: process.env.PG_DATABASE || "productdb",
user: process.env.PG_USER || "certyo_cdc",
password: process.env.PG_PASSWORD || "secret",
};
const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = process.env.CERTYO_BASE_URL || "https://www.certyos.com";
const CHANNEL = "certyo_changes";
const SCHEMA = "public";
const TABLE = "products";
if (!CERTYO_API_KEY || !CERTYO_TENANT_ID) {
console.error("CERTYO_API_KEY and CERTYO_TENANT_ID are required");
process.exit(1);
}
async function setupTrigger(client) {
await client.query(`
CREATE OR REPLACE FUNCTION certyo_notify_change()
RETURNS trigger AS $$
DECLARE
payload JSON;
record_row RECORD;
BEGIN
IF TG_OP = 'DELETE' THEN
record_row := OLD;
ELSE
record_row := NEW;
END IF;
payload := json_build_object(
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'record', row_to_json(record_row)
);
PERFORM pg_notify('${CHANNEL}', payload::text);
RETURN record_row;
END;
$$ LANGUAGE plpgsql;
`);
await client.query(`
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgname = 'certyo_change_trigger'
AND tgrelid = '${SCHEMA}.${TABLE}'::regclass
) THEN
CREATE TRIGGER certyo_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON ${SCHEMA}.${TABLE}
FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();
END IF;
END $$;
`);
console.log(`Trigger installed on ${SCHEMA}.${TABLE}`);
}
function mapToRecord(payload) {
const data = JSON.parse(payload);
const recordData = data.record;
const opMap = { INSERT: "insert", UPDATE: "update", DELETE: "delete" };
const operation = opMap[data.operation] || "upsert";
const recordId = String(
recordData.id || recordData.product_id || "unknown"
);
return {
tenantId: CERTYO_TENANT_ID,
database: PG_CONFIG.database,
collection: `${SCHEMA}.${TABLE}`,
recordId,
recordVersion: "1",
operationType: operation,
recordPayload: recordData,
sourceTimestamp: new Date().toISOString(),
idempotencyKey: `pg-notify-${TABLE}-${recordId}-${new Date().toISOString().substring(0, 13)}`,
};
}
async function ingestRecord(record) {
const response = await fetch(`${CERTYO_BASE_URL}/api/v1/records`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-API-Key": CERTYO_API_KEY,
},
body: JSON.stringify(record),
});
if (response.ok) {
const result = await response.json();
console.log(` Ingested ${record.recordId}: hash=${result.recordHash}`);
return result;
} else {
const body = await response.text();
console.error(` Failed ${record.recordId}: ${response.status} ${body}`);
return null;
}
}
async function main() {
const client = new Client(PG_CONFIG);
await client.connect();
console.log("Connected to PostgreSQL");
await setupTrigger(client);
await client.query(`LISTEN ${CHANNEL}`);
console.log(`Listening on channel '${CHANNEL}'...`);
client.on("notification", async (msg) => {
try {
const record = mapToRecord(msg.payload);
await ingestRecord(record);
} catch (err) {
console.error("Error processing notification:", err.message);
}
});
// Keep alive
process.on("SIGINT", async () => {
console.log("Shutting down...");
await client.end();
process.exit(0);
});
}
main().catch((err) => {
console.error("Fatal error:", err);
process.exit(1);
});version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
debezium:
image: debezium/connect:2.5
depends_on: [kafka]
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: certyo-pg-cdc
CONFIG_STORAGE_TOPIC: _debezium_configs
OFFSET_STORAGE_TOPIC: _debezium_offsets
STATUS_STORAGE_TOPIC: _debezium_status
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# Prerequisites: wal_level=logical must be set in postgresql.conf
#
# Register the PostgreSQL connector:
#
# curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
# "name": "certyo-pg-source",
# "config": {
# "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
# "database.hostname": "host.docker.internal",
# "database.port": "5432",
# "database.user": "certyo_cdc",
# "database.password": "secret",
# "database.dbname": "productdb",
# "topic.prefix": "certyo",
# "table.include.list": "public.products",
# "plugin.name": "pgoutput",
# "slot.name": "certyo_slot",
# "publication.name": "certyo_publication",
# "snapshot.mode": "initial"
# }
# }'
#
# This creates a Kafka topic: certyo.public.products
# Write a consumer that reads from this topic and POSTs to Certyo /api/v1/records"""
PostgreSQL polling worker for Certyo using SQLAlchemy.
Queries rows modified since the last sync and ingests them.
Install: pip install sqlalchemy psycopg2-binary requests
Usage: CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python certyo_pg_poll.py
"""
import os
import time
import requests
from datetime import datetime, timezone
from sqlalchemy import create_engine, text
PG_URL = os.environ.get(
"PG_URL",
"postgresql://certyo_cdc:secret@localhost:5432/productdb"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30"))
BULK_THRESHOLD = 10
engine = create_engine(PG_URL)
def ensure_tracking_table():
"""Create the sync tracking table if it doesn't exist."""
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS _certyo_sync_cursors (
table_name TEXT PRIMARY KEY,
last_sync_at TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01T00:00:00Z',
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
"""))
def get_last_sync():
"""Load the last sync timestamp."""
with engine.connect() as conn:
result = conn.execute(text(
"SELECT last_sync_at FROM _certyo_sync_cursors WHERE table_name = :table"
), {"table": f"{SCHEMA}.{TABLE}"})
row = result.fetchone()
return row[0] if row else datetime(2000, 1, 1, tzinfo=timezone.utc)
def save_last_sync(sync_time):
"""Persist the sync timestamp."""
with engine.begin() as conn:
conn.execute(text("""
INSERT INTO _certyo_sync_cursors (table_name, last_sync_at, updated_at)
VALUES (:table, :sync_at, NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_at = :sync_at,
updated_at = NOW();
"""), {"table": f"{SCHEMA}.{TABLE}", "sync_at": sync_time})
def map_row(row_dict):
"""Map a PostgreSQL row to a Certyo record."""
record_id = str(row_dict.get("id", row_dict.get("product_id", "unknown")))
# Convert datetime values to ISO strings for JSON
payload = {}
for key, value in row_dict.items():
if isinstance(value, datetime):
payload[key] = value.isoformat()
else:
payload[key] = value
return {
"tenantId": CERTYO_TENANT_ID,
"database": DATABASE,
"collection": f"{SCHEMA}.{TABLE}",
"recordId": record_id,
"recordVersion": "1",
"operationType": "upsert",
"recordPayload": payload,
"sourceTimestamp": row_dict.get("updated_at", datetime.now(timezone.utc)).isoformat()
if isinstance(row_dict.get("updated_at"), datetime)
else datetime.now(timezone.utc).isoformat(),
"idempotencyKey": f"pg-poll-{TABLE}-{record_id}-"
f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}",
}
def ingest_records(records):
"""Send records to Certyo — bulk if above threshold."""
headers = {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
}
if len(records) > BULK_THRESHOLD:
for i in range(0, len(records), 1000):
batch = records[i : i + 1000]
payload = {"tenantId": CERTYO_TENANT_ID, "records": batch}
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
headers=headers,
json=payload,
)
if resp.ok:
print(f" Bulk ingested {len(batch)} records")
else:
print(f" Bulk failed: {resp.status_code} {resp.text}")
else:
for record in records:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
headers=headers,
json=record,
)
if resp.ok:
result = resp.json()
print(f" Ingested {record['recordId']}: {result['recordHash']}")
else:
print(f" Failed {record['recordId']}: {resp.status_code}")
def poll_cycle():
"""Run one polling cycle."""
last_sync = get_last_sync()
now = datetime.now(timezone.utc)
with engine.connect() as conn:
result = conn.execute(text(f"""
SELECT * FROM {SCHEMA}.{TABLE}
WHERE updated_at >= :last_sync
ORDER BY updated_at ASC
LIMIT 5000
"""), {"last_sync": last_sync})
columns = list(result.keys())
rows = [dict(zip(columns, row)) for row in result.fetchall()]
if not rows:
return
print(f"Found {len(rows)} modified rows since {last_sync.isoformat()}")
records = [map_row(row) for row in rows]
ingest_records(records)
save_last_sync(now)
if __name__ == "__main__":
ensure_tracking_table()
print(f"PostgreSQL polling worker started (interval: {POLL_INTERVAL}s)")
print(f"Watching {SCHEMA}.{TABLE}")
while True:
try:
poll_cycle()
except Exception as e:
print(f"Poll cycle error: {e}")
time.sleep(POLL_INTERVAL)Database setup
Before using the LISTEN/NOTIFY approach, ensure the trigger function is installed. The scripts above create the trigger automatically, but you can also install it manually:
-- Create the notification trigger function
CREATE OR REPLACE FUNCTION certyo_notify_change()
RETURNS trigger AS $$
DECLARE
payload JSON;
record_row RECORD;
BEGIN
IF TG_OP = 'DELETE' THEN
record_row := OLD;
ELSE
record_row := NEW;
END IF;
payload := json_build_object(
'operation', TG_OP,
'schema', TG_TABLE_SCHEMA,
'table', TG_TABLE_NAME,
'record', row_to_json(record_row)
);
PERFORM pg_notify('certyo_changes', payload::text);
RETURN record_row;
END;
$$ LANGUAGE plpgsql;
-- Attach to the products table
CREATE TRIGGER certyo_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON public.products
FOR EACH ROW EXECUTE FUNCTION certyo_notify_change();
-- Add Certyo tracking columns (for verification write-back)
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_record_hash TEXT;
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_anchor_status TEXT DEFAULT 'pending';
ALTER TABLE public.products ADD COLUMN IF NOT EXISTS certyo_verified_at TIMESTAMPTZ;
-- For Debezium: enable logical replication
-- (requires wal_level=logical in postgresql.conf and a restart)
CREATE PUBLICATION certyo_publication FOR TABLE public.products;Field mapping
Map PostgreSQL columns to the Certyo record schema:
PostgreSQL Field Certyo Field Notes
────────────────────── ───────────────── ────────────────────────────────────
Primary key (id) recordId Cast to string
Table name collection e.g. "public.products" (schema.table)
Database name database e.g. "productdb"
Full row (JSON) recordPayload row_to_json() or dict serialization
updated_at sourceTimestamp ISO 8601 format
TG_OP / operation operationType INSERT→insert, UPDATE→update, DELETE→delete
id + timestamp idempotencyKey Ensures deduplication on retryVerification
After anchoring completes (~60-90 seconds), verify the record and write the status back to the source row using SQL UPDATE. This creates an audit trail directly in PostgreSQL.
"""
Verify anchored records and write back status to PostgreSQL.
Install: pip install psycopg2-binary requests
Usage: CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python verify_and_update.py
"""
import os
import psycopg2
import requests
from datetime import datetime, timezone
PG_DSN = os.environ.get(
"PG_DSN",
"host=localhost port=5432 dbname=productdb user=certyo_cdc password=secret"
)
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = os.environ.get("CERTYO_BASE_URL", "https://www.certyos.com")
SCHEMA = "public"
TABLE = "products"
DATABASE = "productdb"
def verify_and_update():
conn = psycopg2.connect(PG_DSN)
cur = conn.cursor()
# Find rows that need verification
cur.execute(f"""
SELECT id FROM {SCHEMA}.{TABLE}
WHERE certyo_anchor_status != 'anchored'
OR certyo_anchor_status IS NULL
LIMIT 100
""")
rows = cur.fetchall()
print(f"Verifying {len(rows)} records...")
headers = {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
}
for (record_id,) in rows:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/verify/record",
headers=headers,
json={
"tenantId": CERTYO_TENANT_ID,
"database": DATABASE,
"collection": f"{SCHEMA}.{TABLE}",
"recordId": str(record_id),
},
)
if not resp.ok:
print(f" {record_id}: verification request failed ({resp.status_code})")
continue
result = resp.json()
if result.get("verified") and result.get("anchoredOnChain"):
cur.execute(f"""
UPDATE {SCHEMA}.{TABLE} SET
certyo_record_hash = %s,
certyo_anchor_status = 'anchored',
certyo_verified_at = %s
WHERE id = %s
""", (result["recordHash"], datetime.now(timezone.utc), record_id))
conn.commit()
print(f" {record_id}: anchored (hash: {result['recordHash']})")
else:
print(f" {record_id}: not yet anchored")
cur.close()
conn.close()
if __name__ == "__main__":
verify_and_update()Choosing an approach
- Debezium (production) — Best for high-throughput production workloads. Provides reliable, ordered delivery with automatic offset tracking. Requires Kafka infrastructure.
- LISTEN/NOTIFY (lightweight) — Best for low-to-medium throughput when you want real-time push without external infrastructure. Notifications are lost if no client is listening, so pair with periodic polling for gap recovery.
- Polling (simplest) — Best for getting started quickly or when CDC infrastructure is not available. Requires an
updated_atcolumn on tracked tables. Cannot detect hard deletes.
Bulk optimization
For tables with high write throughput, batch records before calling the API:
POST /api/v1/records/bulkaccepts up to 1000 records per request- The polling approach naturally batches all rows found in each cycle
- For LISTEN/NOTIFY, buffer notifications for a short window (e.g., 5 seconds) before sending a bulk request
- For Debezium, configure the Kafka consumer with a batch listener that accumulates messages
Troubleshooting
- "ERROR: logical decoding requires wal_level >= logical" — Set
wal_level = logicalinpostgresql.confand restart PostgreSQL. On AWS RDS, modify the parameter group. On Azure, update the server parameter. - LISTEN/NOTIFY notifications lost — Notifications are only delivered to connected, listening clients. If the listener was disconnected, run a polling cycle to catch up on missed changes.
- Replication slot growing — If the Debezium consumer is offline, the replication slot retains WAL segments. Monitor
pg_replication_slotsand drop stale slots to prevent disk filling up. - Polling misses deletes — Timestamp-based polling cannot detect hard deletes. Use soft deletes (
deleted_atcolumn) or switch to LISTEN/NOTIFY or Debezium for full delete coverage. - Duplicate records in Certyo — The
idempotencyKeyprevents true duplicates. Ensure your key includes enough specificity (record ID + timestamp or LSN) to distinguish different operations on the same row.
AI Integration Skill
Download a skill file that enables AI agents to generate working PostgreSQL + Certyo integration code for any language or framework.
What's inside
- Authentication — PostgreSQL connection and Certyo API key configuration
- Architecture — Debezium CDC, LISTEN/NOTIFY, and polling patterns
- Field mapping — Primary key, table, schema to Certyo record schema
- Code examples — Python psycopg2, Node.js pg, Debezium, SQLAlchemy polling
- Verification — SQL UPDATE write-back with anchor status columns
- Patterns — Real-time LISTEN/NOTIFY, Debezium CDC, and batch polling
How to use
Claude Code
Place the file in your project's .claude/commands/ directory, then use it as a slash command:
# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-postgresql.md \
https://www.certyos.com/developers/skills/certyo-postgresql-skill.md
# Use it in Claude Code
/certyo-postgresql "Generate a Python listener that uses LISTEN/NOTIFY to push row changes to Certyo"Cursor / Copilot / Any AI Agent
Add the file to your project root or attach it to a conversation. The AI agent will use the PostgreSQL-specific patterns, field mappings, and code examples to generate correct integration code.
# Add to your project
curl -o CERTYO_POSTGRESQL.md \
https://www.certyos.com/developers/skills/certyo-postgresql-skill.md
# Then in your AI agent:
"Using the Certyo PostgreSQL spec in CERTYO_POSTGRESQL.md,
generate a python listener that uses listen/notify to push row changes to certyo"CLAUDE.md Context File
Append the skill file to your project's CLAUDE.md so every Claude conversation has PostgreSQL + Certyo context automatically.
# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo PostgreSQL Integration" >> CLAUDE.md
cat CERTYO_POSTGRESQL.md >> CLAUDE.md