MySQL Integration
Capture row-level changes from MySQL and push them to Certyo for blockchain anchoring. This guide covers three approaches: Debezium binlog CDC (production), trigger-based queue tables, and simple timestamp-based polling.
Overview
MySQL offers several change detection strategies depending on your throughput requirements and infrastructure constraints:
- Debezium + binlog (recommended for production) — The Debezium MySQL connector reads the binary log in real time, streams changes to Kafka, and a consumer forwards them to Certyo. No triggers or polling queries needed.
- Trigger + queue table — A MySQL trigger writes change events to a queue table on every INSERT/UPDATE/DELETE. An external poller reads the queue and calls Certyo.
- Timestamp polling — The simplest approach: add an
updated_atcolumn and poll for rows changed since the last checkpoint. Best for low-volume tables or prototyping.
Architecture
Approach 1: Debezium + Binlog (Production)
┌──────────────────────┐ MySQL binlog ┌───────────────────┐
│ MySQL Database │ ──── ROW-format events ─────────> │ Debezium │
│ products table │ │ MySQL Connector │
│ (INSERT/UPDATE/DEL) │ └────────┬──────────┘
└──────────────────────┘ │ Kafka
v
┌─────────────────┐
│ Kafka Consumer │
│ (Python/Node) │
└────────┬────────┘
│
POST /api/v1/records
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└────────┬────────┘
│
~60-90s anchoring
│
v
┌─────────────────┐
│ Polygon │
│ (on-chain) │
└─────────────────┘
Approach 2: Trigger + Queue Table
┌──────────────────────┐ AFTER INSERT/UPDATE/DELETE ┌───────────────────┐
│ MySQL Database │ ──── trigger writes to ─────────> │ certyo_queue │
│ products table │ │ (queue table) │
└──────────────────────┘ └────────┬──────────┘
Poller reads + deletes
processed rows
│
POST /api/v1/records
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└─────────────────┘
Approach 3: Timestamp Polling
┌──────────────────────┐ ┌───────────────────┐
│ MySQL Database │ ◄──── SELECT WHERE updated_at > ──│ External Poller │
│ products table │ last_checkpoint │ (Python/Node) │
│ (updated_at col) │ └────────┬──────────┘
└──────────────────────┘ │
POST /api/v1/records
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└─────────────────┘Prerequisites
- MySQL 5.7+ or MySQL 8.x (8.x recommended for native JSON support and improved binlog)
- For Debezium:
binlog_format=ROW,binlog_row_image=FULL, andlog_bin=ONinmy.cnf - A MySQL user with
REPLICATION SLAVE,REPLICATION CLIENTprivileges (for Debezium) - A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
- Python 3.9+ with
mysql-connector-pythonor Node.js 18+ withmysql2
Approach 1: Debezium + Binlog (Production)
The Debezium MySQL connector reads the binary log and streams row-level changes to Kafka topics. A consumer transforms the events into Certyo records and calls the ingestion API. This is the recommended approach for production workloads.
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
mysql:
image: mysql:8.0
ports: ["3306:3306"]
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: product_catalog
command: >
--server-id=1
--log-bin=mysql-bin
--binlog-format=ROW
--binlog-row-image=FULL
--gtid-mode=ON
--enforce-gtid-consistency=ON
connect:
image: debezium/connect:2.6
depends_on: [kafka, mysql]
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: certyo-mysql-cdc
CONFIG_STORAGE_TOPIC: _connect_configs
OFFSET_STORAGE_TOPIC: _connect_offsets
STATUS_STORAGE_TOPIC: _connect_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# After services are running, register the MySQL connector:
#
# curl -X POST http://localhost:8083/connectors \
# -H "Content-Type: application/json" -d '{
# "name": "mysql-certyo-connector",
# "config": {
# "connector.class": "io.debezium.connector.mysql.MySqlConnector",
# "database.hostname": "mysql",
# "database.port": "3306",
# "database.user": "debezium",
# "database.password": "dbz",
# "database.server.id": "184054",
# "topic.prefix": "mysql",
# "database.include.list": "product_catalog",
# "table.include.list": "product_catalog.products",
# "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
# "schema.history.internal.kafka.topic": "schema-changes.mysql",
# "include.schema.changes": "false",
# "snapshot.mode": "initial"
# }
# }'
#
# Create the Debezium user in MySQL:
#
# CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
# GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE,
# REPLICATION CLIENT ON *.* TO 'debezium'@'%';
# FLUSH PRIVILEGES;"""
Kafka consumer for Debezium MySQL CDC events.
Install: pip install confluent-kafka requests
Usage:
CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
KAFKA_BOOTSTRAP=localhost:9092 \
python debezium_consumer.py
"""
import os
import json
import requests
from confluent_kafka import Consumer, KafkaError
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "localhost:9092")
TOPIC = "mysql.product_catalog.products" # prefix.database.table
BULK_THRESHOLD = 10
def create_consumer():
return Consumer({
"bootstrap.servers": KAFKA_BOOTSTRAP,
"group.id": "certyo-mysql-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
def map_debezium_to_certyo(event):
"""Map a Debezium MySQL change event to a Certyo record."""
payload = event.get("payload", {})
after = payload.get("after", {})
before = payload.get("before", {})
source = payload.get("source", {})
op_map = {"c": "insert", "u": "update", "d": "delete", "r": "upsert"}
op = op_map.get(payload.get("op", ""), "upsert")
# Use 'after' for insert/update, 'before' for delete
data = after if after else before
return {
"tenantId": CERTYO_TENANT_ID,
"database": source.get("db", "mysql"),
"collection": source.get("table", "products"),
"recordId": str(data.get("id", "")),
"operationType": op,
"recordPayload": {
"id": data.get("id"),
"sku": data.get("sku"),
"name": data.get("name"),
"category": data.get("category"),
"price": data.get("price"),
"batch_number": data.get("batch_number"),
"manufactured_at": data.get("manufactured_at"),
},
"sourceTimestamp": source.get("ts_ms")
and f"{source['ts_ms'] // 1000}"
or None,
"idempotencyKey": (
f"dbz-{source.get('file', '')}-"
f"{source.get('pos', '')}-"
f"{data.get('id', '')}"
),
}
def ingest_records(records):
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
if len(records) > BULK_THRESHOLD:
for i in range(0, len(records), 1000):
batch = records[i : i + 1000]
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
json={"tenantId": CERTYO_TENANT_ID, "records": batch},
headers=headers, timeout=60,
)
if resp.status_code != 202:
print(f"Bulk failed: {resp.status_code} {resp.text}")
else:
for rec in records:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
json=rec, headers=headers, timeout=30,
)
if resp.status_code != 202:
print(f"Failed: {resp.status_code} {resp.text}")
def main():
consumer = create_consumer()
consumer.subscribe([TOPIC])
print(f"Listening on {TOPIC}...")
batch = []
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
if batch:
ingest_records(batch)
consumer.commit()
batch = []
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
print(f"Kafka error: {msg.error()}")
continue
event = json.loads(msg.value().decode("utf-8"))
record = map_debezium_to_certyo(event)
batch.append(record)
if len(batch) >= 100:
ingest_records(batch)
consumer.commit()
batch = []
if __name__ == "__main__":
main()Approach 2: Trigger + Queue Table
For environments where Kafka is not available, a MySQL trigger writes change events to a queue table. An external worker polls the queue, ingests into Certyo, and deletes processed rows.
-- Step 1: Create the queue table
CREATE TABLE certyo_queue (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
record_id VARCHAR(100) NOT NULL,
op_type ENUM('insert', 'update', 'delete') NOT NULL,
payload JSON NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
INDEX idx_unprocessed (processed, created_at)
) ENGINE=InnoDB;
-- Step 2: Create triggers on the products table
DELIMITER //
CREATE TRIGGER trg_products_insert
AFTER INSERT ON products
FOR EACH ROW
BEGIN
INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
VALUES (
'products',
CAST(NEW.id AS CHAR),
'insert',
JSON_OBJECT(
'id', NEW.id,
'sku', NEW.sku,
'name', NEW.name,
'category', NEW.category,
'price', NEW.price,
'batch_number', NEW.batch_number,
'manufactured_at', NEW.manufactured_at
)
);
END //
CREATE TRIGGER trg_products_update
AFTER UPDATE ON products
FOR EACH ROW
BEGIN
INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
VALUES (
'products',
CAST(NEW.id AS CHAR),
'update',
JSON_OBJECT(
'id', NEW.id,
'sku', NEW.sku,
'name', NEW.name,
'category', NEW.category,
'price', NEW.price,
'batch_number', NEW.batch_number,
'manufactured_at', NEW.manufactured_at
)
);
END //
CREATE TRIGGER trg_products_delete
AFTER DELETE ON products
FOR EACH ROW
BEGIN
INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
VALUES (
'products',
CAST(OLD.id AS CHAR),
'delete',
JSON_OBJECT(
'id', OLD.id,
'sku', OLD.sku,
'name', OLD.name,
'category', OLD.category,
'price', OLD.price
)
);
END //
DELIMITER ;
-- Step 3: Cleanup job (run periodically via cron or EVENT SCHEDULER)
-- Delete processed rows older than 24 hours
DELETE FROM certyo_queue
WHERE processed = TRUE
AND created_at < NOW() - INTERVAL 24 HOUR;"""
MySQL queue table poller for Certyo integration.
Install: pip install mysql-connector-python requests
Usage:
CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
python queue_poller.py
"""
import os
import time
import json
import requests
import mysql.connector
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
"host": os.environ.get("MYSQL_HOST", "localhost"),
"port": int(os.environ.get("MYSQL_PORT", "3306")),
"database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
"user": os.environ.get("MYSQL_USER", "certyo_worker"),
"password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "10"))
BATCH_SIZE = 100
BULK_THRESHOLD = 10
def poll_queue():
"""Read unprocessed rows from the queue table."""
conn = mysql.connector.connect(**MYSQL_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, table_name, record_id, op_type, payload, created_at
FROM certyo_queue
WHERE processed = FALSE
ORDER BY created_at
LIMIT %s
""", (BATCH_SIZE,))
rows = cursor.fetchall()
return conn, cursor, rows
def map_to_certyo_record(row):
"""Map a queue row to a Certyo record payload."""
payload = row["payload"]
if isinstance(payload, str):
payload = json.loads(payload)
return {
"tenantId": CERTYO_TENANT_ID,
"database": MYSQL_CONFIG["database"],
"collection": row["table_name"],
"recordId": row["record_id"],
"operationType": row["op_type"],
"recordPayload": payload,
"sourceTimestamp": row["created_at"].isoformat()
if row.get("created_at") else None,
"idempotencyKey": f"queue-{row['id']}",
}
def ingest_and_mark(conn, cursor, rows):
"""Ingest records into Certyo and mark queue rows as processed."""
records = [map_to_certyo_record(r) for r in rows]
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
success_ids = []
if len(records) > BULK_THRESHOLD:
for i in range(0, len(records), 1000):
batch = records[i : i + 1000]
batch_rows = rows[i : i + 1000]
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
json={"tenantId": CERTYO_TENANT_ID, "records": batch},
headers=headers, timeout=60,
)
if resp.status_code == 202:
success_ids.extend(r["id"] for r in batch_rows)
else:
print(f"Bulk failed: {resp.status_code} {resp.text}")
else:
for rec, row in zip(records, rows):
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
json=rec, headers=headers, timeout=30,
)
if resp.status_code == 202:
success_ids.append(row["id"])
else:
print(f"Failed {row['id']}: {resp.status_code} {resp.text}")
# Mark processed
if success_ids:
placeholders = ",".join(["%s"] * len(success_ids))
cursor.execute(
f"UPDATE certyo_queue SET processed = TRUE WHERE id IN ({placeholders})",
success_ids,
)
conn.commit()
print(f"Processed {len(success_ids)} queue entries")
def poll_loop():
print(f"Queue poller started (interval={POLL_INTERVAL}s)")
while True:
try:
conn, cursor, rows = poll_queue()
if rows:
ingest_and_mark(conn, cursor, rows)
cursor.close()
conn.close()
except Exception as e:
print(f"Poll error: {e}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
poll_loop()Approach 3: Timestamp Polling
The simplest integration approach: add an updated_at timestamp column and poll for rows modified since the last checkpoint. This works well for low-to-medium volume tables but cannot detect deletes.
"""
Timestamp-based MySQL poller for Certyo integration.
Install: pip install mysql-connector-python requests
Usage:
CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
python timestamp_poller.py
"""
import os
import time
import requests
import mysql.connector
from datetime import datetime, timezone
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
"host": os.environ.get("MYSQL_HOST", "localhost"),
"port": int(os.environ.get("MYSQL_PORT", "3306")),
"database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
"user": os.environ.get("MYSQL_USER", "certyo_worker"),
"password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15"))
BULK_THRESHOLD = 10
# In-memory checkpoint (persist to a file or table in production)
last_checkpoint = "1970-01-01 00:00:00"
def poll_changes():
"""Query rows modified since the last checkpoint."""
global last_checkpoint
conn = mysql.connector.connect(**MYSQL_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, sku, name, category, price, batch_number,
manufactured_at, updated_at
FROM products
WHERE updated_at > %s
ORDER BY updated_at
LIMIT 500
""", (last_checkpoint,))
rows = cursor.fetchall()
cursor.close()
conn.close()
return rows
def map_to_certyo_record(row):
return {
"tenantId": CERTYO_TENANT_ID,
"database": MYSQL_CONFIG["database"],
"collection": "products",
"recordId": str(row["id"]),
"operationType": "upsert",
"recordPayload": {
"id": row["id"],
"sku": row.get("sku"),
"name": row.get("name"),
"category": row.get("category"),
"price": float(row["price"]) if row.get("price") else None,
"batch_number": row.get("batch_number"),
"manufactured_at": str(row["manufactured_at"])
if row.get("manufactured_at") else None,
},
"sourceTimestamp": row["updated_at"].isoformat()
if row.get("updated_at") else None,
"idempotencyKey": f"poll-{row['id']}-{row.get('updated_at', '')}",
}
def ingest_records(records):
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
if len(records) > BULK_THRESHOLD:
for i in range(0, len(records), 1000):
batch = records[i : i + 1000]
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
json={"tenantId": CERTYO_TENANT_ID, "records": batch},
headers=headers, timeout=60,
)
if resp.status_code != 202:
print(f"Bulk failed: {resp.status_code} {resp.text}")
else:
for rec in records:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
json=rec, headers=headers, timeout=30,
)
if resp.status_code != 202:
print(f"Failed: {resp.status_code} {resp.text}")
def poll_loop():
global last_checkpoint
print(f"Timestamp poller started (interval={POLL_INTERVAL}s)")
while True:
try:
rows = poll_changes()
if rows:
records = [map_to_certyo_record(r) for r in rows]
print(f"Found {len(records)} changed rows")
ingest_records(records)
last_checkpoint = str(max(
r["updated_at"] for r in rows
))
print(f"Checkpoint updated to {last_checkpoint}")
except Exception as e:
print(f"Poll error: {e}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
poll_loop()/**
* Timestamp-based MySQL poller for Certyo integration.
* Install: npm install mysql2 node-fetch
*
* Usage:
* CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
* MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
* node timestampPoller.js
*/
const mysql = require("mysql2/promise");
const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = "https://www.certyos.com";
const POLL_INTERVAL = parseInt(process.env.POLL_INTERVAL || "15", 10) * 1000;
const BULK_THRESHOLD = 10;
let lastCheckpoint = "1970-01-01T00:00:00Z";
const pool = mysql.createPool({
host: process.env.MYSQL_HOST || "localhost",
port: parseInt(process.env.MYSQL_PORT || "3306", 10),
database: process.env.MYSQL_DATABASE || "product_catalog",
user: process.env.MYSQL_USER || "certyo_worker",
password: process.env.MYSQL_PASSWORD || "changeme",
waitForConnections: true,
connectionLimit: 5,
});
async function pollChanges() {
const [rows] = await pool.execute(
`SELECT id, sku, name, category, price, batch_number,
manufactured_at, updated_at
FROM products
WHERE updated_at > ?
ORDER BY updated_at
LIMIT 500`,
[lastCheckpoint]
);
return rows;
}
function mapToRecord(row) {
return {
tenantId: CERTYO_TENANT_ID,
database: process.env.MYSQL_DATABASE || "product_catalog",
collection: "products",
recordId: String(row.id),
operationType: "upsert",
recordPayload: {
id: row.id,
sku: row.sku,
name: row.name,
category: row.category,
price: row.price ? Number(row.price) : null,
batch_number: row.batch_number,
manufactured_at: row.manufactured_at
? row.manufactured_at.toISOString()
: null,
},
sourceTimestamp: row.updated_at ? row.updated_at.toISOString() : null,
idempotencyKey: `poll-${row.id}-${row.updated_at?.toISOString() || ""}`,
};
}
async function ingestSingle(record) {
const resp = await fetch(`${CERTYO_BASE_URL}/api/v1/records`, {
method: "POST",
headers: {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
},
body: JSON.stringify(record),
});
if (resp.status !== 202) {
console.error(`Failed: ${resp.status} ${await resp.text()}`);
}
}
async function ingestBulk(records) {
for (let i = 0; i < records.length; i += 1000) {
const batch = records.slice(i, i + 1000);
const resp = await fetch(`${CERTYO_BASE_URL}/api/v1/records/bulk`, {
method: "POST",
headers: {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
},
body: JSON.stringify({ tenantId: CERTYO_TENANT_ID, records: batch }),
});
if (resp.status !== 202) {
console.error(`Bulk failed: ${resp.status} ${await resp.text()}`);
}
}
}
async function pollLoop() {
console.log(`Timestamp poller started (interval=${POLL_INTERVAL / 1000}s)`);
while (true) {
try {
const rows = await pollChanges();
if (rows.length > 0) {
const records = rows.map(mapToRecord);
console.log(`Found ${records.length} changed rows`);
if (records.length > BULK_THRESHOLD) {
await ingestBulk(records);
} else {
for (const rec of records) {
await ingestSingle(rec);
}
}
// Update checkpoint to the latest updated_at
const maxDate = rows.reduce((max, r) =>
r.updated_at > max ? r.updated_at : max,
rows[0].updated_at
);
lastCheckpoint = maxDate.toISOString();
console.log(`Checkpoint: ${lastCheckpoint}`);
}
} catch (err) {
console.error("Poll error:", err.message);
}
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL));
}
}
pollLoop();Field Mapping
Map MySQL columns to Certyo record fields. The mapping applies across all three approaches.
MySQL Field Certyo Field Notes
─────────────────────────────────────────────────────────────────────────
id (PRIMARY KEY) recordId Cast to string
products (table name) collection Lowercase MySQL convention
product_catalog (database) database MySQL database name
id + binlog pos / timestamp idempotencyKey Unique per change event
updated_at / NOW() sourceTimestamp ISO 8601 format
'insert'/'update'/'delete' operationType From trigger, Debezium, or 'upsert' for polling
Full row as JSON recordPayload All business columns serialized
Debezium Operation Mapping:
"c" (create) → "insert"
"u" (update) → "update"
"d" (delete) → "delete"
"r" (read/snapshot) → "upsert"Verification and Write-back
After records are ingested, add tracking columns to the source table and periodically verify anchoring status via the Certyo API.
-- Step 1: Add tracking columns to the source table
ALTER TABLE products ADD COLUMN certyo_record_hash VARCHAR(128) DEFAULT NULL;
ALTER TABLE products ADD COLUMN certyo_anchor_status VARCHAR(20) DEFAULT 'Pending';
ALTER TABLE products ADD COLUMN certyo_verified_at TIMESTAMP NULL DEFAULT NULL;
-- Step 2: After ingestion, write back the record hash
UPDATE products
SET certyo_record_hash = :record_hash,
certyo_anchor_status = 'Pending'
WHERE id = :product_id;
-- Step 3: After verification confirms anchoring
UPDATE products
SET certyo_anchor_status = 'Anchored',
certyo_verified_at = NOW()
WHERE id = :product_id;
-- Step 4: Query unverified records for the verification worker
SELECT id, certyo_record_hash
FROM products
WHERE certyo_anchor_status = 'Pending'
AND certyo_record_hash IS NOT NULL
AND updated_at < NOW() - INTERVAL 2 MINUTE;"""
Verification worker: checks Certyo anchoring status and updates MySQL.
Install: pip install mysql-connector-python requests
"""
import os
import time
import requests
import mysql.connector
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
"host": os.environ.get("MYSQL_HOST", "localhost"),
"port": int(os.environ.get("MYSQL_PORT", "3306")),
"database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
"user": os.environ.get("MYSQL_USER", "certyo_worker"),
"password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}
def verify_pending_records():
conn = mysql.connector.connect(**MYSQL_CONFIG)
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT id, certyo_record_hash
FROM products
WHERE certyo_anchor_status = 'Pending'
AND certyo_record_hash IS NOT NULL
AND updated_at < NOW() - INTERVAL 2 MINUTE
LIMIT 100
""")
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
verified = 0
failed = 0
for row in cursor.fetchall():
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/verify/record",
json={
"tenantId": CERTYO_TENANT_ID,
"database": MYSQL_CONFIG["database"],
"collection": "products",
"recordId": str(row["id"]),
},
headers=headers, timeout=30,
)
if resp.status_code == 200:
result = resp.json()
if result.get("verified") and result.get("anchoredOnChain"):
cursor.execute("""
UPDATE products
SET certyo_anchor_status = 'Anchored',
certyo_verified_at = NOW()
WHERE id = %s
""", (row["id"],))
verified += 1
else:
failed += 1
else:
failed += 1
conn.commit()
cursor.close()
conn.close()
print(f"Verified: {verified}, Failed/Pending: {failed}")
if __name__ == "__main__":
while True:
try:
verify_pending_records()
except Exception as e:
print(f"Verification error: {e}")
time.sleep(120)Authentication Reference
MySQL connection
The integration connects to MySQL using standard connection parameters (host, port, user, password, database). For production, use SSL/TLS connections (ssl_ca, ssl_cert, ssl_key in Python; ssl option in Node.js). For AWS RDS or Azure Database for MySQL, download the provider's CA certificate.
Certyo API key
Pass your Certyo API key in the X-API-Key header on all Certyo API calls. Store it in environment variables, AWS Secrets Manager, or a vault — never hardcode in source files or commit to version control.
Troubleshooting
- Debezium connector fails to start — Verify binlog settings with
SHOW VARIABLES LIKE 'binlog_format'(must beROW) andSHOW VARIABLES LIKE 'binlog_row_image'(must beFULL). Also check that the Debezium user hasREPLICATION SLAVEandREPLICATION CLIENTprivileges. - Trigger queue grows unbounded — Ensure the poller is running and marking rows as
processed = TRUE. Add a cleanup job to delete processed rows older than 24 hours. - Timestamp polling misses changes — This approach cannot detect deletes. For delete tracking, use the trigger or Debezium approach. Also ensure the
updated_atcolumn is indexed and updated on every change. - Duplicate records in Certyo — Use
idempotencyKeywith the primary key and binlog position (Debezium), queue ID (trigger), or timestamp (polling). Certyo deduplicates automatically and returnsidempotencyReplayed: true. - MySQL 5.7 GTID issues — If
gtid_modeis OFF on MySQL 5.7, set"database.history.skip.unparseable.ddl": "true"in the Debezium connector config. GTID is recommended but not required.
AI Integration Skill
Download a skill file that enables AI agents to generate working MySQL + Certyo integration code for any language or framework.
What's inside
- Authentication — MySQL connection strings, binlog user grants, SSL configuration
- Architecture — Three approaches: Debezium binlog, trigger queue table, timestamp polling
- Field mapping — MySQL columns and Debezium operations to Certyo record schema
- Code examples — Python poller, Node.js poller, MySQL triggers, Debezium Docker Compose
- Verification — Periodic anchoring verification with MySQL UPDATE write-back
- MySQL patterns — Binlog configuration, trigger queue cleanup, checkpoint persistence
How to use
Claude Code
Place the file in your project's .claude/commands/ directory, then use it as a slash command:
# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-mysql.md \
https://www.certyos.com/developers/skills/certyo-mysql-skill.md
# Use it in Claude Code
/certyo-mysql "Generate a Python worker that polls MySQL for changes and pushes them to Certyo"Cursor / Copilot / Any AI Agent
Add the file to your project root or attach it to a conversation. The AI agent will use the MySQL-specific patterns, field mappings, and code examples to generate correct integration code.
# Add to your project
curl -o CERTYO_MYSQL.md \
https://www.certyos.com/developers/skills/certyo-mysql-skill.md
# Then in your AI agent:
"Using the Certyo MySQL spec in CERTYO_MYSQL.md,
generate a python worker that polls mysql for changes and pushes them to certyo"CLAUDE.md Context File
Append the skill file to your project's CLAUDE.md so every Claude conversation has MySQL + Certyo context automatically.
# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo MySQL Integration" >> CLAUDE.md
cat CERTYO_MYSQL.md >> CLAUDE.md