Skip to main contentCertyo Developer Portal

MySQL Integration

Capture row-level changes from MySQL and push them to Certyo for blockchain anchoring. This guide covers three approaches: Debezium binlog CDC (production), trigger-based queue tables, and simple timestamp-based polling.

Binlog configuration required
Debezium requires binlog_format=ROW and binlog_row_image=FULL. Verify with SHOW VARIABLES LIKE 'binlog%'. These are the defaults on MySQL 8.x but may need explicit configuration on MySQL 5.7.

Overview

MySQL offers several change detection strategies depending on your throughput requirements and infrastructure constraints:

  • Debezium + binlog (recommended for production) — The Debezium MySQL connector reads the binary log in real time, streams changes to Kafka, and a consumer forwards them to Certyo. No triggers or polling queries needed.
  • Trigger + queue table — A MySQL trigger writes change events to a queue table on every INSERT/UPDATE/DELETE. An external poller reads the queue and calls Certyo.
  • Timestamp polling — The simplest approach: add an updated_at column and poll for rows changed since the last checkpoint. Best for low-volume tables or prototyping.

Architecture

Data flow — three approachesbash
Approach 1: Debezium + Binlog (Production)
┌──────────────────────┐       MySQL binlog                ┌───────────────────┐
│  MySQL Database      │ ──── ROW-format events ─────────> │  Debezium         │
│  products table      │                                   │  MySQL Connector  │
│  (INSERT/UPDATE/DEL) │                                   └────────┬──────────┘
└──────────────────────┘                                            │ Kafka
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Kafka Consumer  │
                                                           │  (Python/Node)   │
                                                           └────────┬────────┘
                                                                    │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └────────┬────────┘
                                                                    │
                                                          ~60-90s anchoring
                                                                    │
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Polygon        │
                                                           │  (on-chain)     │
                                                           └─────────────────┘

Approach 2: Trigger + Queue Table
┌──────────────────────┐  AFTER INSERT/UPDATE/DELETE       ┌───────────────────┐
│  MySQL Database      │ ──── trigger writes to ─────────> │  certyo_queue     │
│  products table      │                                   │  (queue table)    │
└──────────────────────┘                                   └────────┬──────────┘
                                                      Poller reads + deletes
                                                      processed rows
                                                                    │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └─────────────────┘

Approach 3: Timestamp Polling
┌──────────────────────┐                                   ┌───────────────────┐
│  MySQL Database      │ ◄──── SELECT WHERE updated_at > ──│  External Poller  │
│  products table      │       last_checkpoint             │  (Python/Node)    │
│  (updated_at col)    │                                   └────────┬──────────┘
└──────────────────────┘                                            │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └─────────────────┘

Prerequisites

  • MySQL 5.7+ or MySQL 8.x (8.x recommended for native JSON support and improved binlog)
  • For Debezium: binlog_format=ROW, binlog_row_image=FULL, and log_bin=ON in my.cnf
  • A MySQL user with REPLICATION SLAVE, REPLICATION CLIENT privileges (for Debezium)
  • A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
  • Python 3.9+ with mysql-connector-python or Node.js 18+ with mysql2

Approach 1: Debezium + Binlog (Production)

The Debezium MySQL connector reads the binary log and streams row-level changes to Kafka topics. A consumer transforms the events into Certyo records and calls the ingestion API. This is the recommended approach for production workloads.

docker-compose.yml — Debezium + Kafka + MySQL connectorbash
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:8.0
    ports: ["3306:3306"]
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: product_catalog
    command: >
      --server-id=1
      --log-bin=mysql-bin
      --binlog-format=ROW
      --binlog-row-image=FULL
      --gtid-mode=ON
      --enforce-gtid-consistency=ON

  connect:
    image: debezium/connect:2.6
    depends_on: [kafka, mysql]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: certyo-mysql-cdc
      CONFIG_STORAGE_TOPIC: _connect_configs
      OFFSET_STORAGE_TOPIC: _connect_offsets
      STATUS_STORAGE_TOPIC: _connect_statuses
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1

# After services are running, register the MySQL connector:
#
# curl -X POST http://localhost:8083/connectors \
#   -H "Content-Type: application/json" -d '{
#   "name": "mysql-certyo-connector",
#   "config": {
#     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
#     "database.hostname": "mysql",
#     "database.port": "3306",
#     "database.user": "debezium",
#     "database.password": "dbz",
#     "database.server.id": "184054",
#     "topic.prefix": "mysql",
#     "database.include.list": "product_catalog",
#     "table.include.list": "product_catalog.products",
#     "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
#     "schema.history.internal.kafka.topic": "schema-changes.mysql",
#     "include.schema.changes": "false",
#     "snapshot.mode": "initial"
#   }
# }'
#
# Create the Debezium user in MySQL:
#
# CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
# GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE,
#       REPLICATION CLIENT ON *.* TO 'debezium'@'%';
# FLUSH PRIVILEGES;

Approach 2: Trigger + Queue Table

For environments where Kafka is not available, a MySQL trigger writes change events to a queue table. An external worker polls the queue, ingests into Certyo, and deletes processed rows.

Create queue table and triggersbash
-- Step 1: Create the queue table
CREATE TABLE certyo_queue (
    id           BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name   VARCHAR(100) NOT NULL,
    record_id    VARCHAR(100) NOT NULL,
    op_type      ENUM('insert', 'update', 'delete') NOT NULL,
    payload      JSON NOT NULL,
    created_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    processed    BOOLEAN DEFAULT FALSE,
    INDEX idx_unprocessed (processed, created_at)
) ENGINE=InnoDB;

-- Step 2: Create triggers on the products table

DELIMITER //

CREATE TRIGGER trg_products_insert
AFTER INSERT ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(NEW.id AS CHAR),
        'insert',
        JSON_OBJECT(
            'id', NEW.id,
            'sku', NEW.sku,
            'name', NEW.name,
            'category', NEW.category,
            'price', NEW.price,
            'batch_number', NEW.batch_number,
            'manufactured_at', NEW.manufactured_at
        )
    );
END //

CREATE TRIGGER trg_products_update
AFTER UPDATE ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(NEW.id AS CHAR),
        'update',
        JSON_OBJECT(
            'id', NEW.id,
            'sku', NEW.sku,
            'name', NEW.name,
            'category', NEW.category,
            'price', NEW.price,
            'batch_number', NEW.batch_number,
            'manufactured_at', NEW.manufactured_at
        )
    );
END //

CREATE TRIGGER trg_products_delete
AFTER DELETE ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(OLD.id AS CHAR),
        'delete',
        JSON_OBJECT(
            'id', OLD.id,
            'sku', OLD.sku,
            'name', OLD.name,
            'category', OLD.category,
            'price', OLD.price
        )
    );
END //

DELIMITER ;

-- Step 3: Cleanup job (run periodically via cron or EVENT SCHEDULER)
-- Delete processed rows older than 24 hours
DELETE FROM certyo_queue
WHERE processed = TRUE
  AND created_at < NOW() - INTERVAL 24 HOUR;

Approach 3: Timestamp Polling

The simplest integration approach: add an updated_at timestamp column and poll for rows modified since the last checkpoint. This works well for low-to-medium volume tables but cannot detect deletes.

timestamp_poller.py — Simple polling with mysql-connector-pythonpython
"""
Timestamp-based MySQL poller for Certyo integration.
Install: pip install mysql-connector-python requests

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
    python timestamp_poller.py
"""

import os
import time
import requests
import mysql.connector
from datetime import datetime, timezone

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
    "host": os.environ.get("MYSQL_HOST", "localhost"),
    "port": int(os.environ.get("MYSQL_PORT", "3306")),
    "database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
    "user": os.environ.get("MYSQL_USER", "certyo_worker"),
    "password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15"))
BULK_THRESHOLD = 10

# In-memory checkpoint (persist to a file or table in production)
last_checkpoint = "1970-01-01 00:00:00"


def poll_changes():
    """Query rows modified since the last checkpoint."""
    global last_checkpoint
    conn = mysql.connector.connect(**MYSQL_CONFIG)
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
        SELECT id, sku, name, category, price, batch_number,
               manufactured_at, updated_at
        FROM products
        WHERE updated_at > %s
        ORDER BY updated_at
        LIMIT 500
    """, (last_checkpoint,))

    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows


def map_to_certyo_record(row):
    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": MYSQL_CONFIG["database"],
        "collection": "products",
        "recordId": str(row["id"]),
        "operationType": "upsert",
        "recordPayload": {
            "id": row["id"],
            "sku": row.get("sku"),
            "name": row.get("name"),
            "category": row.get("category"),
            "price": float(row["price"]) if row.get("price") else None,
            "batch_number": row.get("batch_number"),
            "manufactured_at": str(row["manufactured_at"])
                if row.get("manufactured_at") else None,
        },
        "sourceTimestamp": row["updated_at"].isoformat()
            if row.get("updated_at") else None,
        "idempotencyKey": f"poll-{row['id']}-{row.get('updated_at', '')}",
    }


def ingest_records(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
                headers=headers, timeout=60,
            )
            if resp.status_code != 202:
                print(f"Bulk failed: {resp.status_code} {resp.text}")
    else:
        for rec in records:
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                json=rec, headers=headers, timeout=30,
            )
            if resp.status_code != 202:
                print(f"Failed: {resp.status_code} {resp.text}")


def poll_loop():
    global last_checkpoint
    print(f"Timestamp poller started (interval={POLL_INTERVAL}s)")
    while True:
        try:
            rows = poll_changes()
            if rows:
                records = [map_to_certyo_record(r) for r in rows]
                print(f"Found {len(records)} changed rows")
                ingest_records(records)
                last_checkpoint = str(max(
                    r["updated_at"] for r in rows
                ))
                print(f"Checkpoint updated to {last_checkpoint}")
        except Exception as e:
            print(f"Poll error: {e}")

        time.sleep(POLL_INTERVAL)


if __name__ == "__main__":
    poll_loop()

Field Mapping

Map MySQL columns to Certyo record fields. The mapping applies across all three approaches.

MySQL → Certyo field mappingbash
MySQL Field                   Certyo Field        Notes
─────────────────────────────────────────────────────────────────────────
id (PRIMARY KEY)             recordId            Cast to string
products (table name)        collection          Lowercase MySQL convention
product_catalog (database)   database            MySQL database name
id + binlog pos / timestamp  idempotencyKey      Unique per change event
updated_at / NOW()           sourceTimestamp      ISO 8601 format
'insert'/'update'/'delete'   operationType       From trigger, Debezium, or 'upsert' for polling
Full row as JSON             recordPayload       All business columns serialized

Debezium Operation Mapping:
  "c"  (create)  → "insert"
  "u"  (update)  → "update"
  "d"  (delete)  → "delete"
  "r"  (read/snapshot) → "upsert"

Verification and Write-back

After records are ingested, add tracking columns to the source table and periodically verify anchoring status via the Certyo API.

Add Certyo tracking columnsbash
-- Step 1: Add tracking columns to the source table
ALTER TABLE products ADD COLUMN certyo_record_hash VARCHAR(128) DEFAULT NULL;
ALTER TABLE products ADD COLUMN certyo_anchor_status VARCHAR(20) DEFAULT 'Pending';
ALTER TABLE products ADD COLUMN certyo_verified_at TIMESTAMP NULL DEFAULT NULL;

-- Step 2: After ingestion, write back the record hash
UPDATE products
SET certyo_record_hash = :record_hash,
    certyo_anchor_status = 'Pending'
WHERE id = :product_id;

-- Step 3: After verification confirms anchoring
UPDATE products
SET certyo_anchor_status = 'Anchored',
    certyo_verified_at = NOW()
WHERE id = :product_id;

-- Step 4: Query unverified records for the verification worker
SELECT id, certyo_record_hash
FROM products
WHERE certyo_anchor_status = 'Pending'
  AND certyo_record_hash IS NOT NULL
  AND updated_at < NOW() - INTERVAL 2 MINUTE;
verification_worker.py — Verify anchoring statuspython
"""
Verification worker: checks Certyo anchoring status and updates MySQL.
Install: pip install mysql-connector-python requests
"""

import os
import time
import requests
import mysql.connector

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
    "host": os.environ.get("MYSQL_HOST", "localhost"),
    "port": int(os.environ.get("MYSQL_PORT", "3306")),
    "database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
    "user": os.environ.get("MYSQL_USER", "certyo_worker"),
    "password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}


def verify_pending_records():
    conn = mysql.connector.connect(**MYSQL_CONFIG)
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
        SELECT id, certyo_record_hash
        FROM products
        WHERE certyo_anchor_status = 'Pending'
          AND certyo_record_hash IS NOT NULL
          AND updated_at < NOW() - INTERVAL 2 MINUTE
        LIMIT 100
    """)

    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    verified = 0
    failed = 0

    for row in cursor.fetchall():
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/verify/record",
            json={
                "tenantId": CERTYO_TENANT_ID,
                "database": MYSQL_CONFIG["database"],
                "collection": "products",
                "recordId": str(row["id"]),
            },
            headers=headers, timeout=30,
        )

        if resp.status_code == 200:
            result = resp.json()
            if result.get("verified") and result.get("anchoredOnChain"):
                cursor.execute("""
                    UPDATE products
                    SET certyo_anchor_status = 'Anchored',
                        certyo_verified_at = NOW()
                    WHERE id = %s
                """, (row["id"],))
                verified += 1
            else:
                failed += 1
        else:
            failed += 1

    conn.commit()
    cursor.close()
    conn.close()
    print(f"Verified: {verified}, Failed/Pending: {failed}")


if __name__ == "__main__":
    while True:
        try:
            verify_pending_records()
        except Exception as e:
            print(f"Verification error: {e}")
        time.sleep(120)

Authentication Reference

MySQL connection

The integration connects to MySQL using standard connection parameters (host, port, user, password, database). For production, use SSL/TLS connections (ssl_ca, ssl_cert, ssl_key in Python; ssl option in Node.js). For AWS RDS or Azure Database for MySQL, download the provider's CA certificate.

Certyo API key

Pass your Certyo API key in the X-API-Key header on all Certyo API calls. Store it in environment variables, AWS Secrets Manager, or a vault — never hardcode in source files or commit to version control.


Troubleshooting

  • Debezium connector fails to start — Verify binlog settings with SHOW VARIABLES LIKE 'binlog_format' (must be ROW) and SHOW VARIABLES LIKE 'binlog_row_image' (must be FULL). Also check that the Debezium user has REPLICATION SLAVE and REPLICATION CLIENT privileges.
  • Trigger queue grows unbounded — Ensure the poller is running and marking rows as processed = TRUE. Add a cleanup job to delete processed rows older than 24 hours.
  • Timestamp polling misses changes — This approach cannot detect deletes. For delete tracking, use the trigger or Debezium approach. Also ensure the updated_at column is indexed and updated on every change.
  • Duplicate records in Certyo — Use idempotencyKey with the primary key and binlog position (Debezium), queue ID (trigger), or timestamp (polling). Certyo deduplicates automatically and returns idempotencyReplayed: true.
  • MySQL 5.7 GTID issues — If gtid_mode is OFF on MySQL 5.7, set "database.history.skip.unparseable.ddl": "true" in the Debezium connector config. GTID is recommended but not required.

AI Integration · v1.0.0

AI Integration Skill

Download a skill file that enables AI agents to generate working MySQL + Certyo integration code for any language or framework.

v1.0.0
What is this?
A markdown file containing MySQL-specific field mappings, authentication setup, code examples, and integration patterns for Certyo. Drop it into your AI agent's context and ask it to generate integration code.

What's inside

  • AuthenticationMySQL connection strings, binlog user grants, SSL configuration
  • ArchitectureThree approaches: Debezium binlog, trigger queue table, timestamp polling
  • Field mappingMySQL columns and Debezium operations to Certyo record schema
  • Code examplesPython poller, Node.js poller, MySQL triggers, Debezium Docker Compose
  • VerificationPeriodic anchoring verification with MySQL UPDATE write-back
  • MySQL patternsBinlog configuration, trigger queue cleanup, checkpoint persistence

How to use

Claude Code

Place the file in your project's .claude/commands/ directory, then use it as a slash command:

# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-mysql.md \
  https://www.certyos.com/developers/skills/certyo-mysql-skill.md

# Use it in Claude Code
/certyo-mysql "Generate a Python worker that polls MySQL for changes and pushes them to Certyo"

Cursor / Copilot / Any AI Agent

Add the file to your project root or attach it to a conversation. The AI agent will use the MySQL-specific patterns, field mappings, and code examples to generate correct integration code.

# Add to your project
curl -o CERTYO_MYSQL.md \
  https://www.certyos.com/developers/skills/certyo-mysql-skill.md

# Then in your AI agent:
"Using the Certyo MySQL spec in CERTYO_MYSQL.md,
 generate a python worker that polls mysql for changes and pushes them to certyo"

CLAUDE.md Context File

Append the skill file to your project's CLAUDE.md so every Claude conversation has MySQL + Certyo context automatically.

# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo MySQL Integration" >> CLAUDE.md
cat CERTYO_MYSQL.md >> CLAUDE.md