Skip to main contentCertyo Developer Portal

Integración con MySQL

Captura cambios a nivel de fila de MySQL y envíalos a Certyo para anclaje en blockchain. Esta guía cubre tres enfoques: Debezium binlog CDC (producción), tablas de cola basadas en triggers, y polling simple basado en marcas de tiempo.

Configuración de binlog requerida
Debezium requiere binlog_format=ROW y binlog_row_image=FULL. Verifica con SHOW VARIABLES LIKE 'binlog%'. Estos son los valores por defecto en MySQL 8.x pero pueden necesitar configuración explícita en MySQL 5.7.

Descripción general

MySQL offers several change detection strategies depending on your throughput requirements and infrastructure constraints:

  • Debezium + binlog (recommended for production) — The Debezium MySQL connector reads the binary log in real time, streams changes to Kafka, and a consumer forwards them to Certyo. No triggers or polling queries needed.
  • Trigger + queue table — A MySQL trigger writes change events to a queue table on every INSERT/UPDATE/DELETE. An external poller reads the queue and calls Certyo.
  • Timestamp polling — The simplest approach: add an updated_at column and poll for rows changed since the last checkpoint. Best for low-volume tables or prototyping.

Arquitectura

Data flow — three approachesbash
Approach 1: Debezium + Binlog (Production)
┌──────────────────────┐       MySQL binlog                ┌───────────────────┐
│  MySQL Database      │ ──── ROW-format events ─────────> │  Debezium         │
│  products table      │                                   │  MySQL Connector  │
│  (INSERT/UPDATE/DEL) │                                   └────────┬──────────┘
└──────────────────────┘                                            │ Kafka
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Kafka Consumer  │
                                                           │  (Python/Node)   │
                                                           └────────┬────────┘
                                                                    │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └────────┬────────┘
                                                                    │
                                                          ~60-90s anchoring
                                                                    │
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Polygon        │
                                                           │  (on-chain)     │
                                                           └─────────────────┘

Approach 2: Trigger + Queue Table
┌──────────────────────┐  AFTER INSERT/UPDATE/DELETE       ┌───────────────────┐
│  MySQL Database      │ ──── trigger writes to ─────────> │  certyo_queue     │
│  products table      │                                   │  (queue table)    │
└──────────────────────┘                                   └────────┬──────────┘
                                                      Poller reads + deletes
                                                      processed rows
                                                                    │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └─────────────────┘

Approach 3: Timestamp Polling
┌──────────────────────┐                                   ┌───────────────────┐
│  MySQL Database      │ ◄──── SELECT WHERE updated_at > ──│  External Poller  │
│  products table      │       last_checkpoint             │  (Python/Node)    │
│  (updated_at col)    │                                   └────────┬──────────┘
└──────────────────────┘                                            │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └─────────────────┘

Requisitos previos

  • MySQL 5.7+ or MySQL 8.x (8.x recommended for native JSON support and improved binlog)
  • For Debezium: binlog_format=ROW, binlog_row_image=FULL, and log_bin=ON in my.cnf
  • A MySQL user with REPLICATION SLAVE, REPLICATION CLIENT privileges (for Debezium)
  • A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
  • Python 3.9+ with mysql-connector-python or Node.js 18+ with mysql2

Enfoque 1: Debezium + Binlog (Producción)

The Debezium MySQL connector reads the binary log and streams row-level changes to Kafka topics. A consumer transforms the events into Certyo records and calls the ingestion API. This is the recommended approach for production workloads.

docker-compose.yml — Debezium + Kafka + MySQL connectorbash
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:8.0
    ports: ["3306:3306"]
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: product_catalog
    command: >
      --server-id=1
      --log-bin=mysql-bin
      --binlog-format=ROW
      --binlog-row-image=FULL
      --gtid-mode=ON
      --enforce-gtid-consistency=ON

  connect:
    image: debezium/connect:2.6
    depends_on: [kafka, mysql]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: certyo-mysql-cdc
      CONFIG_STORAGE_TOPIC: _connect_configs
      OFFSET_STORAGE_TOPIC: _connect_offsets
      STATUS_STORAGE_TOPIC: _connect_statuses
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1

# After services are running, register the MySQL connector:
#
# curl -X POST http://localhost:8083/connectors \
#   -H "Content-Type: application/json" -d '{
#   "name": "mysql-certyo-connector",
#   "config": {
#     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
#     "database.hostname": "mysql",
#     "database.port": "3306",
#     "database.user": "debezium",
#     "database.password": "dbz",
#     "database.server.id": "184054",
#     "topic.prefix": "mysql",
#     "database.include.list": "product_catalog",
#     "table.include.list": "product_catalog.products",
#     "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
#     "schema.history.internal.kafka.topic": "schema-changes.mysql",
#     "include.schema.changes": "false",
#     "snapshot.mode": "initial"
#   }
# }'
#
# Create the Debezium user in MySQL:
#
# CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
# GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE,
#       REPLICATION CLIENT ON *.* TO 'debezium'@'%';
# FLUSH PRIVILEGES;

Enfoque 2: Trigger + Tabla de cola

For environments where Kafka is not available, a MySQL trigger writes change events to a queue table. An external worker polls the queue, ingests into Certyo, and deletes processed rows.

Create queue table and triggersbash
-- Step 1: Create the queue table
CREATE TABLE certyo_queue (
    id           BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name   VARCHAR(100) NOT NULL,
    record_id    VARCHAR(100) NOT NULL,
    op_type      ENUM('insert', 'update', 'delete') NOT NULL,
    payload      JSON NOT NULL,
    created_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    processed    BOOLEAN DEFAULT FALSE,
    INDEX idx_unprocessed (processed, created_at)
) ENGINE=InnoDB;

-- Step 2: Create triggers on the products table

DELIMITER //

CREATE TRIGGER trg_products_insert
AFTER INSERT ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(NEW.id AS CHAR),
        'insert',
        JSON_OBJECT(
            'id', NEW.id,
            'sku', NEW.sku,
            'name', NEW.name,
            'category', NEW.category,
            'price', NEW.price,
            'batch_number', NEW.batch_number,
            'manufactured_at', NEW.manufactured_at
        )
    );
END //

CREATE TRIGGER trg_products_update
AFTER UPDATE ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(NEW.id AS CHAR),
        'update',
        JSON_OBJECT(
            'id', NEW.id,
            'sku', NEW.sku,
            'name', NEW.name,
            'category', NEW.category,
            'price', NEW.price,
            'batch_number', NEW.batch_number,
            'manufactured_at', NEW.manufactured_at
        )
    );
END //

CREATE TRIGGER trg_products_delete
AFTER DELETE ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(OLD.id AS CHAR),
        'delete',
        JSON_OBJECT(
            'id', OLD.id,
            'sku', OLD.sku,
            'name', OLD.name,
            'category', OLD.category,
            'price', OLD.price
        )
    );
END //

DELIMITER ;

-- Step 3: Cleanup job (run periodically via cron or EVENT SCHEDULER)
-- Delete processed rows older than 24 hours
DELETE FROM certyo_queue
WHERE processed = TRUE
  AND created_at < NOW() - INTERVAL 24 HOUR;

Enfoque 3: Polling por marca de tiempo

The simplest integration approach: add an updated_at timestamp column and poll for rows modified since the last checkpoint. This works well for low-to-medium volume tables but cannot detect deletes.

timestamp_poller.py — Simple polling with mysql-connector-pythonpython
"""
Timestamp-based MySQL poller for Certyo integration.
Install: pip install mysql-connector-python requests

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
    python timestamp_poller.py
"""

import os
import time
import requests
import mysql.connector
from datetime import datetime, timezone

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
    "host": os.environ.get("MYSQL_HOST", "localhost"),
    "port": int(os.environ.get("MYSQL_PORT", "3306")),
    "database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
    "user": os.environ.get("MYSQL_USER", "certyo_worker"),
    "password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15"))
BULK_THRESHOLD = 10

# In-memory checkpoint (persist to a file or table in production)
last_checkpoint = "1970-01-01 00:00:00"


def poll_changes():
    """Query rows modified since the last checkpoint."""
    global last_checkpoint
    conn = mysql.connector.connect(**MYSQL_CONFIG)
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
        SELECT id, sku, name, category, price, batch_number,
               manufactured_at, updated_at
        FROM products
        WHERE updated_at > %s
        ORDER BY updated_at
        LIMIT 500
    """, (last_checkpoint,))

    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows


def map_to_certyo_record(row):
    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": MYSQL_CONFIG["database"],
        "collection": "products",
        "recordId": str(row["id"]),
        "operationType": "upsert",
        "recordPayload": {
            "id": row["id"],
            "sku": row.get("sku"),
            "name": row.get("name"),
            "category": row.get("category"),
            "price": float(row["price"]) if row.get("price") else None,
            "batch_number": row.get("batch_number"),
            "manufactured_at": str(row["manufactured_at"])
                if row.get("manufactured_at") else None,
        },
        "sourceTimestamp": row["updated_at"].isoformat()
            if row.get("updated_at") else None,
        "idempotencyKey": f"poll-{row['id']}-{row.get('updated_at', '')}",
    }


def ingest_records(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
                headers=headers, timeout=60,
            )
            if resp.status_code != 202:
                print(f"Bulk failed: {resp.status_code} {resp.text}")
    else:
        for rec in records:
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                json=rec, headers=headers, timeout=30,
            )
            if resp.status_code != 202:
                print(f"Failed: {resp.status_code} {resp.text}")


def poll_loop():
    global last_checkpoint
    print(f"Timestamp poller started (interval={POLL_INTERVAL}s)")
    while True:
        try:
            rows = poll_changes()
            if rows:
                records = [map_to_certyo_record(r) for r in rows]
                print(f"Found {len(records)} changed rows")
                ingest_records(records)
                last_checkpoint = str(max(
                    r["updated_at"] for r in rows
                ))
                print(f"Checkpoint updated to {last_checkpoint}")
        except Exception as e:
            print(f"Poll error: {e}")

        time.sleep(POLL_INTERVAL)


if __name__ == "__main__":
    poll_loop()

Mapeo de campos

Map MySQL columns to Certyo record fields. The mapping applies across all three approaches.

MySQL → Certyo field mappingbash
MySQL Field                   Certyo Field        Notes
─────────────────────────────────────────────────────────────────────────
id (PRIMARY KEY)             recordId            Cast to string
products (table name)        collection          Lowercase MySQL convention
product_catalog (database)   database            MySQL database name
id + binlog pos / timestamp  idempotencyKey      Unique per change event
updated_at / NOW()           sourceTimestamp      ISO 8601 format
'insert'/'update'/'delete'   operationType       From trigger, Debezium, or 'upsert' for polling
Full row as JSON             recordPayload       All business columns serialized

Debezium Operation Mapping:
  "c"  (create)  → "insert"
  "u"  (update)  → "update"
  "d"  (delete)  → "delete"
  "r"  (read/snapshot) → "upsert"

Verificación y escritura de vuelta

After records are ingested, add tracking columns to the source table and periodically verify anchoring status via the Certyo API.

Add Certyo tracking columnsbash
-- Step 1: Add tracking columns to the source table
ALTER TABLE products ADD COLUMN certyo_record_hash VARCHAR(128) DEFAULT NULL;
ALTER TABLE products ADD COLUMN certyo_anchor_status VARCHAR(20) DEFAULT 'Pending';
ALTER TABLE products ADD COLUMN certyo_verified_at TIMESTAMP NULL DEFAULT NULL;

-- Step 2: After ingestion, write back the record hash
UPDATE products
SET certyo_record_hash = :record_hash,
    certyo_anchor_status = 'Pending'
WHERE id = :product_id;

-- Step 3: After verification confirms anchoring
UPDATE products
SET certyo_anchor_status = 'Anchored',
    certyo_verified_at = NOW()
WHERE id = :product_id;

-- Step 4: Query unverified records for the verification worker
SELECT id, certyo_record_hash
FROM products
WHERE certyo_anchor_status = 'Pending'
  AND certyo_record_hash IS NOT NULL
  AND updated_at < NOW() - INTERVAL 2 MINUTE;
verification_worker.py — Verify anchoring statuspython
"""
Verification worker: checks Certyo anchoring status and updates MySQL.
Install: pip install mysql-connector-python requests
"""

import os
import time
import requests
import mysql.connector

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
    "host": os.environ.get("MYSQL_HOST", "localhost"),
    "port": int(os.environ.get("MYSQL_PORT", "3306")),
    "database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
    "user": os.environ.get("MYSQL_USER", "certyo_worker"),
    "password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}


def verify_pending_records():
    conn = mysql.connector.connect(**MYSQL_CONFIG)
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
        SELECT id, certyo_record_hash
        FROM products
        WHERE certyo_anchor_status = 'Pending'
          AND certyo_record_hash IS NOT NULL
          AND updated_at < NOW() - INTERVAL 2 MINUTE
        LIMIT 100
    """)

    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    verified = 0
    failed = 0

    for row in cursor.fetchall():
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/verify/record",
            json={
                "tenantId": CERTYO_TENANT_ID,
                "database": MYSQL_CONFIG["database"],
                "collection": "products",
                "recordId": str(row["id"]),
            },
            headers=headers, timeout=30,
        )

        if resp.status_code == 200:
            result = resp.json()
            if result.get("verified") and result.get("anchoredOnChain"):
                cursor.execute("""
                    UPDATE products
                    SET certyo_anchor_status = 'Anchored',
                        certyo_verified_at = NOW()
                    WHERE id = %s
                """, (row["id"],))
                verified += 1
            else:
                failed += 1
        else:
            failed += 1

    conn.commit()
    cursor.close()
    conn.close()
    print(f"Verified: {verified}, Failed/Pending: {failed}")


if __name__ == "__main__":
    while True:
        try:
            verify_pending_records()
        except Exception as e:
            print(f"Verification error: {e}")
        time.sleep(120)

Referencia de autenticación

Conexión a MySQL

The integration connects to MySQL using standard connection parameters (host, port, user, password, database). For production, use SSL/TLS connections (ssl_ca, ssl_cert, ssl_key in Python; ssl option in Node.js). For AWS RDS or Azure Database for MySQL, download the provider's CA certificate.

Clave API de Certyo

Pass your Certyo API key in the X-API-Key header on all Certyo API calls. Store it in environment variables, AWS Secrets Manager, or a vault — never hardcode in source files or commit to version control.


Solución de problemas

  • Debezium connector fails to start — Verify binlog settings with SHOW VARIABLES LIKE 'binlog_format' (must be ROW) and SHOW VARIABLES LIKE 'binlog_row_image' (must be FULL). Also check that the Debezium user has REPLICATION SLAVE and REPLICATION CLIENT privileges.
  • Trigger queue grows unbounded — Ensure the poller is running and marking rows as processed = TRUE. Add a cleanup job to delete processed rows older than 24 hours.
  • Timestamp polling misses changes — This approach cannot detect deletes. For delete tracking, use the trigger or Debezium approach. Also ensure the updated_at column is indexed and updated on every change.
  • Duplicate records in Certyo — Use idempotencyKey with the primary key and binlog position (Debezium), queue ID (trigger), or timestamp (polling). Certyo deduplicates automatically and returns idempotencyReplayed: true.
  • MySQL 5.7 GTID issues — If gtid_mode is OFF on MySQL 5.7, set "database.history.skip.unparseable.ddl": "true" in the Debezium connector config. GTID is recommended but not required.

AI Integration · v1.0.0

AI Integration Skill

Download a skill file that enables AI agents to generate working MySQL + Certyo integration code for any language or framework.

v1.0.0
What is this?
A markdown file containing MySQL-specific field mappings, authentication setup, code examples, and integration patterns for Certyo. Drop it into your AI agent's context and ask it to generate integration code.

What's inside

  • AutenticaciónCadenas de conexión MySQL, permisos de usuario binlog, configuración SSL
  • ArquitecturaTres enfoques: Debezium binlog, tabla de cola con trigger, polling por marca de tiempo
  • Mapeo de camposColumnas MySQL y operaciones Debezium al esquema de registro Certyo
  • Ejemplos de códigoPoller Python, poller Node.js, triggers MySQL, Docker Compose de Debezium
  • VerificaciónVerificación periódica de anclaje con escritura de vuelta MySQL UPDATE
  • Patrones MySQLConfiguración de binlog, limpieza de tabla de cola con trigger, persistencia de punto de control

How to use

Claude Code

Place the file in your project's .claude/commands/ directory, then use it as a slash command:

# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-mysql.md \
  https://www.certyos.com/developers/skills/certyo-mysql-skill.md

# Use it in Claude Code
/certyo-mysql "Genera un worker Python que consulte MySQL para cambios y los envíe a Certyo"

Cursor / Copilot / Any AI Agent

Add the file to your project root or attach it to a conversation. The AI agent will use the MySQL-specific patterns, field mappings, and code examples to generate correct integration code.

# Add to your project
curl -o CERTYO_MYSQL.md \
  https://www.certyos.com/developers/skills/certyo-mysql-skill.md

# Then in your AI agent:
"Using the Certyo MySQL spec in CERTYO_MYSQL.md,
 genera un worker python que consulte mysql para cambios y los envíe a certyo"

CLAUDE.md Context File

Append the skill file to your project's CLAUDE.md so every Claude conversation has MySQL + Certyo context automatically.

# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo MySQL Integration" >> CLAUDE.md
cat CERTYO_MYSQL.md >> CLAUDE.md