---
name: Certyo + MySQL Integration
version: 1.0.0
description: Generate MySQL → Certyo integration code with Debezium binlog, trigger queue, and polling patterns, field mappings, auth, and working examples
api_base: https://www.certyos.com
auth: X-API-Key header
last_updated: 2026-04-14
---

# Certyo + MySQL Integration Skill

This skill generates production-ready Python, Node.js, and SQL code to integrate MySQL with Certyo's blockchain-backed authenticity platform. It supports three change detection approaches: Debezium binlog CDC (production), trigger-based queue tables, and simple timestamp-based polling.

## Certyo API Reference

All requests require the `X-API-Key` header for authentication.

### Endpoints

| Method | Path | Description | Response |
|--------|------|-------------|----------|
| `POST` | `/api/v1/records` | Ingest a single record | `202 Accepted` |
| `POST` | `/api/v1/records/bulk` | Ingest up to 1000 records | `202 Accepted` |
| `POST` | `/api/v1/verify/record` | Verify blockchain anchoring | `200 OK` |
| `GET` | `/api/v1/records` | Query records | `200 OK` |

### Record Payload

```json
{
  "tenantId": "string (required)",
  "database": "string (required)",
  "collection": "string (required)",
  "recordId": "string (required)",
  "recordPayload": { "...any JSON (required)" },
  "clientId": "string (optional)",
  "recordVersion": "string (optional, default '1')",
  "operationType": "upsert|insert|update|delete (optional)",
  "sourceTimestamp": "ISO 8601 (optional)",
  "idempotencyKey": "string (optional)"
}
```

### Ingestion Response (202 Accepted)

```json
{
  "recordId": "PROD-42",
  "recordHash": "sha256:ab12cd34...",
  "tenantId": "acme-corp",
  "acceptedAt": "2026-04-14T10:30:00Z",
  "idempotencyReplayed": false
}
```

### Pipeline Timing

Records flow through: Kafka -> Accumulate (1000 records or 60s) -> Merkle tree -> IPFS pin -> Polygon anchor. Total anchoring latency is approximately 60-90 seconds after accumulation flush.

## Integration Patterns

### Approach 1: Debezium + Binlog (Production)

```
MySQL binlog (ROW format, FULL row image)
  -> Debezium MySQL connector reads binlog events in real time
    -> Change events published to Kafka topic (mysql.database.table)
      -> Kafka consumer reads events
        -> Maps Debezium envelope to Certyo record
          -> If batch > 10: POST /api/v1/records/bulk
          -> If batch <= 10: POST /api/v1/records (per record)
            -> Commits Kafka offset on success
```

### Approach 2: Trigger + Queue Table

```
MySQL table with AFTER INSERT/UPDATE/DELETE triggers
  -> Triggers write JSON payload to certyo_queue table
    -> External poller reads unprocessed queue rows
      -> Maps queue rows to Certyo records
        -> POST /api/v1/records or /api/v1/records/bulk
          -> Marks queue rows as processed on success
            -> Cleanup job deletes processed rows after 24h
```

### Approach 3: Timestamp Polling

```
MySQL table with updated_at timestamp column
  -> External poller queries WHERE updated_at > last_checkpoint
    -> Maps rows to Certyo records
      -> POST /api/v1/records or /api/v1/records/bulk
        -> Updates checkpoint to latest updated_at
Note: Cannot detect deletes. Use trigger or Debezium for delete tracking.
```

## Authentication

### MySQL Connection (Python)

```python
import mysql.connector

conn = mysql.connector.connect(
    host="localhost",
    port=3306,
    database="product_catalog",
    user="certyo_worker",
    password="changeme",
)
```

For SSL connections (AWS RDS, Azure Database for MySQL):

```python
conn = mysql.connector.connect(
    host="mydb.us-east-1.rds.amazonaws.com",
    port=3306,
    database="product_catalog",
    user="certyo_worker",
    password="changeme",
    ssl_ca="/path/to/rds-combined-ca-bundle.pem",
    ssl_verify_cert=True,
)
```

### MySQL Connection (Node.js)

```javascript
const mysql = require("mysql2/promise");

const pool = mysql.createPool({
  host: "localhost",
  port: 3306,
  database: "product_catalog",
  user: "certyo_worker",
  password: "changeme",
  waitForConnections: true,
  connectionLimit: 5,
});
```

### Certyo API

```python
CERTYO_BASE_URL = "https://www.certyos.com"
headers = {
    "X-API-Key": "cty_xxxxxxxxxxxxxxxxxxxx",
    "Content-Type": "application/json",
}
```

Store the API key in environment variables, AWS Secrets Manager, or a vault. Never hardcode in source files or commit to version control.

## Field Mapping

| MySQL Field | Certyo Field | Notes |
|------------|-------------|-------|
| `id` (PRIMARY KEY) | `recordId` | Cast to string |
| `products` (table name) | `collection` | Lowercase MySQL convention |
| `product_catalog` (database) | `database` | MySQL database name |
| `id + binlog_pos` or `id + updated_at` | `idempotencyKey` | Unique per change event |
| `updated_at` / `NOW()` | `sourceTimestamp` | ISO 8601 format |
| `'insert'/'update'/'delete'` | `operationType` | From trigger, Debezium, or 'upsert' for polling |
| Full row as JSON | `recordPayload` | All business columns serialized |

### Debezium Operation Mapping

| Debezium `op` | Certyo `operationType` |
|--------------|----------------------|
| `c` (create) | `insert` |
| `u` (update) | `update` |
| `d` (delete) | `delete` |
| `r` (read/snapshot) | `upsert` |

### Trigger Queue Mapping

| Trigger Predicate | Queue `op_type` | Certyo `operationType` |
|------------------|----------------|----------------------|
| `AFTER INSERT` | `insert` | `insert` |
| `AFTER UPDATE` | `update` | `update` |
| `AFTER DELETE` | `delete` | `delete` |

## Code Examples

### 1. MySQL Binlog Configuration

```sql
-- Verify binlog settings (required for Debezium)
SHOW VARIABLES LIKE 'binlog_format';      -- Must be ROW
SHOW VARIABLES LIKE 'binlog_row_image';   -- Must be FULL
SHOW VARIABLES LIKE 'log_bin';            -- Must be ON

-- If not set, add to my.cnf / mysqld.cnf and restart:
-- [mysqld]
-- server-id = 1
-- log-bin = mysql-bin
-- binlog-format = ROW
-- binlog-row-image = FULL
-- gtid-mode = ON
-- enforce-gtid-consistency = ON

-- Create Debezium user
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE,
      REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
```

### 2. Trigger + Queue Table Setup

```sql
-- Step 1: Create the queue table
CREATE TABLE certyo_queue (
    id           BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name   VARCHAR(100) NOT NULL,
    record_id    VARCHAR(100) NOT NULL,
    op_type      ENUM('insert', 'update', 'delete') NOT NULL,
    payload      JSON NOT NULL,
    created_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    processed    BOOLEAN DEFAULT FALSE,
    INDEX idx_unprocessed (processed, created_at)
) ENGINE=InnoDB;

-- Step 2: Add Certyo tracking columns
ALTER TABLE products ADD COLUMN certyo_record_hash VARCHAR(128) DEFAULT NULL;
ALTER TABLE products ADD COLUMN certyo_anchor_status VARCHAR(20) DEFAULT 'Pending';
ALTER TABLE products ADD COLUMN certyo_verified_at TIMESTAMP NULL DEFAULT NULL;

-- Step 3: Create triggers

DELIMITER //

CREATE TRIGGER trg_products_insert
AFTER INSERT ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(NEW.id AS CHAR),
        'insert',
        JSON_OBJECT(
            'id', NEW.id,
            'sku', NEW.sku,
            'name', NEW.name,
            'category', NEW.category,
            'price', NEW.price,
            'batch_number', NEW.batch_number,
            'manufactured_at', NEW.manufactured_at
        )
    );
END //

CREATE TRIGGER trg_products_update
AFTER UPDATE ON products
FOR EACH ROW
BEGIN
    -- Skip if only Certyo tracking columns changed
    IF OLD.sku <=> NEW.sku
       AND OLD.name <=> NEW.name
       AND OLD.category <=> NEW.category
       AND OLD.price <=> NEW.price
       AND OLD.batch_number <=> NEW.batch_number
    THEN
        -- Only tracking columns changed, skip
        SET @noop = 1;
    ELSE
        INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
        VALUES (
            'products',
            CAST(NEW.id AS CHAR),
            'update',
            JSON_OBJECT(
                'id', NEW.id,
                'sku', NEW.sku,
                'name', NEW.name,
                'category', NEW.category,
                'price', NEW.price,
                'batch_number', NEW.batch_number,
                'manufactured_at', NEW.manufactured_at
            )
        );
    END IF;
END //

CREATE TRIGGER trg_products_delete
AFTER DELETE ON products
FOR EACH ROW
BEGIN
    INSERT INTO certyo_queue (table_name, record_id, op_type, payload)
    VALUES (
        'products',
        CAST(OLD.id AS CHAR),
        'delete',
        JSON_OBJECT(
            'id', OLD.id,
            'sku', OLD.sku,
            'name', OLD.name,
            'category', OLD.category,
            'price', OLD.price
        )
    );
END //

DELIMITER ;
```

### 3. Python Timestamp Poller

```python
"""
Timestamp-based MySQL poller for Certyo integration.
Install: pip install mysql-connector-python requests

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
    python timestamp_poller.py
"""

import os
import time
import requests
import mysql.connector
from datetime import datetime, timezone

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
    "host": os.environ.get("MYSQL_HOST", "localhost"),
    "port": int(os.environ.get("MYSQL_PORT", "3306")),
    "database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
    "user": os.environ.get("MYSQL_USER", "certyo_worker"),
    "password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15"))
BULK_THRESHOLD = 10

last_checkpoint = "1970-01-01 00:00:00"


def poll_changes():
    global last_checkpoint
    conn = mysql.connector.connect(**MYSQL_CONFIG)
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
        SELECT id, sku, name, category, price, batch_number,
               manufactured_at, updated_at
        FROM products
        WHERE updated_at > %s
        ORDER BY updated_at
        LIMIT 500
    """, (last_checkpoint,))

    rows = cursor.fetchall()
    cursor.close()
    conn.close()
    return rows


def map_to_certyo_record(row):
    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": MYSQL_CONFIG["database"],
        "collection": "products",
        "recordId": str(row["id"]),
        "operationType": "upsert",
        "recordPayload": {
            "id": row["id"],
            "sku": row.get("sku"),
            "name": row.get("name"),
            "category": row.get("category"),
            "price": float(row["price"]) if row.get("price") else None,
            "batch_number": row.get("batch_number"),
            "manufactured_at": str(row["manufactured_at"])
                if row.get("manufactured_at") else None,
        },
        "sourceTimestamp": row["updated_at"].isoformat()
            if row.get("updated_at") else None,
        "idempotencyKey": f"poll-{row['id']}-{row.get('updated_at', '')}",
    }


def ingest_records(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
                headers=headers, timeout=60,
            )
            if resp.status_code != 202:
                print(f"Bulk failed: {resp.status_code} {resp.text}")
    else:
        for rec in records:
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                json=rec, headers=headers, timeout=30,
            )
            if resp.status_code != 202:
                print(f"Failed: {resp.status_code} {resp.text}")


def poll_loop():
    global last_checkpoint
    print(f"Timestamp poller started (interval={POLL_INTERVAL}s)")
    while True:
        try:
            rows = poll_changes()
            if rows:
                records = [map_to_certyo_record(r) for r in rows]
                print(f"Found {len(records)} changed rows")
                ingest_records(records)
                last_checkpoint = str(max(
                    r["updated_at"] for r in rows
                ))
                print(f"Checkpoint updated to {last_checkpoint}")
        except Exception as e:
            print(f"Poll error: {e}")

        time.sleep(POLL_INTERVAL)


if __name__ == "__main__":
    poll_loop()
```

### 4. Node.js Timestamp Poller

```javascript
/**
 * Timestamp-based MySQL poller for Certyo integration.
 * Install: npm install mysql2 node-fetch
 *
 * Usage:
 *   CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
 *   MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
 *   node timestampPoller.js
 */

const mysql = require("mysql2/promise");

const CERTYO_API_KEY = process.env.CERTYO_API_KEY;
const CERTYO_TENANT_ID = process.env.CERTYO_TENANT_ID;
const CERTYO_BASE_URL = "https://www.certyos.com";
const POLL_INTERVAL = parseInt(process.env.POLL_INTERVAL || "15", 10) * 1000;
const BULK_THRESHOLD = 10;

let lastCheckpoint = "1970-01-01T00:00:00Z";

const pool = mysql.createPool({
  host: process.env.MYSQL_HOST || "localhost",
  port: parseInt(process.env.MYSQL_PORT || "3306", 10),
  database: process.env.MYSQL_DATABASE || "product_catalog",
  user: process.env.MYSQL_USER || "certyo_worker",
  password: process.env.MYSQL_PASSWORD || "changeme",
  waitForConnections: true,
  connectionLimit: 5,
});

async function pollChanges() {
  const [rows] = await pool.execute(
    `SELECT id, sku, name, category, price, batch_number,
            manufactured_at, updated_at
     FROM products
     WHERE updated_at > ?
     ORDER BY updated_at
     LIMIT 500`,
    [lastCheckpoint]
  );
  return rows;
}

function mapToRecord(row) {
  return {
    tenantId: CERTYO_TENANT_ID,
    database: process.env.MYSQL_DATABASE || "product_catalog",
    collection: "products",
    recordId: String(row.id),
    operationType: "upsert",
    recordPayload: {
      id: row.id,
      sku: row.sku,
      name: row.name,
      category: row.category,
      price: row.price ? Number(row.price) : null,
      batch_number: row.batch_number,
      manufactured_at: row.manufactured_at
        ? row.manufactured_at.toISOString()
        : null,
    },
    sourceTimestamp: row.updated_at ? row.updated_at.toISOString() : null,
    idempotencyKey: `poll-${row.id}-${row.updated_at?.toISOString() || ""}`,
  };
}

async function ingestSingle(record) {
  const resp = await fetch(`${CERTYO_BASE_URL}/api/v1/records`, {
    method: "POST",
    headers: {
      "X-API-Key": CERTYO_API_KEY,
      "Content-Type": "application/json",
    },
    body: JSON.stringify(record),
  });
  if (resp.status !== 202) {
    console.error(`Failed: ${resp.status} ${await resp.text()}`);
  }
}

async function ingestBulk(records) {
  for (let i = 0; i < records.length; i += 1000) {
    const batch = records.slice(i, i + 1000);
    const resp = await fetch(`${CERTYO_BASE_URL}/api/v1/records/bulk`, {
      method: "POST",
      headers: {
        "X-API-Key": CERTYO_API_KEY,
        "Content-Type": "application/json",
      },
      body: JSON.stringify({ tenantId: CERTYO_TENANT_ID, records: batch }),
    });
    if (resp.status !== 202) {
      console.error(`Bulk failed: ${resp.status} ${await resp.text()}`);
    }
  }
}

async function pollLoop() {
  console.log(`Timestamp poller started (interval=${POLL_INTERVAL / 1000}s)`);

  while (true) {
    try {
      const rows = await pollChanges();
      if (rows.length > 0) {
        const records = rows.map(mapToRecord);
        console.log(`Found ${records.length} changed rows`);

        if (records.length > BULK_THRESHOLD) {
          await ingestBulk(records);
        } else {
          for (const rec of records) {
            await ingestSingle(rec);
          }
        }

        const maxDate = rows.reduce(
          (max, r) => (r.updated_at > max ? r.updated_at : max),
          rows[0].updated_at
        );
        lastCheckpoint = maxDate.toISOString();
        console.log(`Checkpoint: ${lastCheckpoint}`);
      }
    } catch (err) {
      console.error("Poll error:", err.message);
    }

    await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL));
  }
}

pollLoop();
```

### 5. Python Queue Table Poller

```python
"""
MySQL queue table poller for Certyo integration.
Install: pip install mysql-connector-python requests

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    MYSQL_HOST=localhost MYSQL_DATABASE=product_catalog \
    python queue_poller.py
"""

import os
import time
import json
import requests
import mysql.connector

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
MYSQL_CONFIG = {
    "host": os.environ.get("MYSQL_HOST", "localhost"),
    "port": int(os.environ.get("MYSQL_PORT", "3306")),
    "database": os.environ.get("MYSQL_DATABASE", "product_catalog"),
    "user": os.environ.get("MYSQL_USER", "certyo_worker"),
    "password": os.environ.get("MYSQL_PASSWORD", "changeme"),
}
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "10"))
BATCH_SIZE = 100
BULK_THRESHOLD = 10


def poll_queue():
    conn = mysql.connector.connect(**MYSQL_CONFIG)
    cursor = conn.cursor(dictionary=True)

    cursor.execute("""
        SELECT id, table_name, record_id, op_type, payload, created_at
        FROM certyo_queue
        WHERE processed = FALSE
        ORDER BY created_at
        LIMIT %s
    """, (BATCH_SIZE,))

    rows = cursor.fetchall()
    return conn, cursor, rows


def map_to_certyo_record(row):
    payload = row["payload"]
    if isinstance(payload, str):
        payload = json.loads(payload)

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": MYSQL_CONFIG["database"],
        "collection": row["table_name"],
        "recordId": row["record_id"],
        "operationType": row["op_type"],
        "recordPayload": payload,
        "sourceTimestamp": row["created_at"].isoformat()
            if row.get("created_at") else None,
        "idempotencyKey": f"queue-{row['id']}",
    }


def ingest_and_mark(conn, cursor, rows):
    records = [map_to_certyo_record(r) for r in rows]
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}

    success_ids = []

    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            batch_rows = rows[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
                headers=headers, timeout=60,
            )
            if resp.status_code == 202:
                success_ids.extend(r["id"] for r in batch_rows)
            else:
                print(f"Bulk failed: {resp.status_code} {resp.text}")
    else:
        for rec, row in zip(records, rows):
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                json=rec, headers=headers, timeout=30,
            )
            if resp.status_code == 202:
                success_ids.append(row["id"])
            else:
                print(f"Failed {row['id']}: {resp.status_code} {resp.text}")

    if success_ids:
        placeholders = ",".join(["%s"] * len(success_ids))
        cursor.execute(
            f"UPDATE certyo_queue SET processed = TRUE WHERE id IN ({placeholders})",
            success_ids,
        )
        conn.commit()
        print(f"Processed {len(success_ids)} queue entries")


def poll_loop():
    print(f"Queue poller started (interval={POLL_INTERVAL}s)")
    while True:
        try:
            conn, cursor, rows = poll_queue()
            if rows:
                ingest_and_mark(conn, cursor, rows)
            cursor.close()
            conn.close()
        except Exception as e:
            print(f"Poll error: {e}")

        time.sleep(POLL_INTERVAL)


if __name__ == "__main__":
    poll_loop()
```

### 6. Debezium Docker Compose (MySQL + Binlog)

```yaml
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:8.0
    ports: ["3306:3306"]
    environment:
      MYSQL_ROOT_PASSWORD: rootpass
      MYSQL_DATABASE: product_catalog
    command: >
      --server-id=1
      --log-bin=mysql-bin
      --binlog-format=ROW
      --binlog-row-image=FULL
      --gtid-mode=ON
      --enforce-gtid-consistency=ON

  connect:
    image: debezium/connect:2.6
    depends_on: [kafka, mysql]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: certyo-mysql-cdc
      CONFIG_STORAGE_TOPIC: _connect_configs
      OFFSET_STORAGE_TOPIC: _connect_offsets
      STATUS_STORAGE_TOPIC: _connect_statuses
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1
```

Register the connector:

```bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" -d '{
  "name": "mysql-certyo-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "mysql",
    "database.include.list": "product_catalog",
    "table.include.list": "product_catalog.products",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.mysql",
    "include.schema.changes": "false",
    "snapshot.mode": "initial"
  }
}'
```

### 7. Python Debezium Kafka Consumer

```python
"""
Kafka consumer for Debezium MySQL CDC events.
Install: pip install confluent-kafka requests

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    KAFKA_BOOTSTRAP=localhost:9092 \
    python debezium_consumer.py
"""

import os
import json
import requests
from confluent_kafka import Consumer, KafkaError

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "localhost:9092")
TOPIC = "mysql.product_catalog.products"
BULK_THRESHOLD = 10


def create_consumer():
    return Consumer({
        "bootstrap.servers": KAFKA_BOOTSTRAP,
        "group.id": "certyo-mysql-consumer",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
    })


def map_debezium_to_certyo(event):
    payload = event.get("payload", {})
    after = payload.get("after", {})
    before = payload.get("before", {})
    source = payload.get("source", {})

    op_map = {"c": "insert", "u": "update", "d": "delete", "r": "upsert"}
    op = op_map.get(payload.get("op", ""), "upsert")

    data = after if after else before

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": source.get("db", "mysql"),
        "collection": source.get("table", "products"),
        "recordId": str(data.get("id", "")),
        "operationType": op,
        "recordPayload": {
            "id": data.get("id"),
            "sku": data.get("sku"),
            "name": data.get("name"),
            "category": data.get("category"),
            "price": data.get("price"),
            "batch_number": data.get("batch_number"),
            "manufactured_at": data.get("manufactured_at"),
        },
        "sourceTimestamp": source.get("ts_ms")
            and f"{source['ts_ms'] // 1000}"
            or None,
        "idempotencyKey": (
            f"dbz-{source.get('file', '')}-"
            f"{source.get('pos', '')}-"
            f"{data.get('id', '')}"
        ),
    }


def ingest_records(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
                headers=headers, timeout=60,
            )
            if resp.status_code != 202:
                print(f"Bulk failed: {resp.status_code} {resp.text}")
    else:
        for rec in records:
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                json=rec, headers=headers, timeout=30,
            )
            if resp.status_code != 202:
                print(f"Failed: {resp.status_code} {resp.text}")


def main():
    consumer = create_consumer()
    consumer.subscribe([TOPIC])
    print(f"Listening on {TOPIC}...")

    batch = []
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            if batch:
                ingest_records(batch)
                consumer.commit()
                batch = []
            continue
        if msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                print(f"Kafka error: {msg.error()}")
            continue

        event = json.loads(msg.value().decode("utf-8"))
        record = map_debezium_to_certyo(event)
        batch.append(record)

        if len(batch) >= 100:
            ingest_records(batch)
            consumer.commit()
            batch = []


if __name__ == "__main__":
    main()
```

## Verification & Write-back

### Verification Flow

1. Verification worker runs every 2 minutes
2. Queries `products` rows where `certyo_anchor_status = 'Pending'` and `updated_at` is older than 2 minutes
3. Calls `POST /api/v1/verify/record` with `tenantId`, `recordId` (id), and `recordHash`
4. On success, updates the source row:
   - `certyo_anchor_status` = `'Anchored'`
   - `certyo_verified_at` = `NOW()`

### Write-back SQL

```sql
-- After ingestion: store the record hash
UPDATE products
SET certyo_record_hash = :record_hash,
    certyo_anchor_status = 'Pending'
WHERE id = :product_id;

-- After verification: mark as anchored
UPDATE products
SET certyo_anchor_status = 'Anchored',
    certyo_verified_at = NOW()
WHERE id = :product_id;
```

### Trigger and Write-back Interaction

When the verification worker updates `certyo_anchor_status` and `certyo_verified_at`, the update trigger fires. The trigger should compare business columns (using the `<=>` null-safe comparison operator) to avoid enqueuing tracking-only changes:

```sql
-- The update trigger already includes this check:
IF OLD.sku <=> NEW.sku
   AND OLD.name <=> NEW.name
   AND OLD.category <=> NEW.category
   AND OLD.price <=> NEW.price
   AND OLD.batch_number <=> NEW.batch_number
THEN
    -- Only tracking columns changed, skip
    SET @noop = 1;
```

## Code Generation Rules

1. **Use Debezium for production workloads.** It reads the binlog asynchronously without triggers or polling, supports millions of changes per hour, and handles schema evolution. The trigger and polling approaches are for environments without Kafka.

2. **Use the bulk endpoint (`POST /api/v1/records/bulk`) when the batch contains more than 10 records.** The bulk endpoint accepts up to 1000 records per call. For batches exceeding 1000, split into multiple bulk calls.

3. **Verify binlog configuration before deploying Debezium.** `binlog_format` must be `ROW` (not `STATEMENT` or `MIXED`), and `binlog_row_image` must be `FULL`. Run `SHOW VARIABLES LIKE 'binlog%'` to check. On MySQL 8.x these are defaults; on 5.7 they may need explicit configuration.

4. **Always include an `idempotencyKey`.** For Debezium: `dbz-{binlog_file}-{binlog_pos}-{id}`. For queue table: `queue-{queue_id}`. For polling: `poll-{id}-{updated_at}`. Certyo deduplicates and returns `idempotencyReplayed: true`.

5. **Implement exponential backoff on HTTP 429 (Too Many Requests).** Start with 1-second delay, double on each retry up to 3 retries. Never retry immediately on 429.

6. **Exclude Certyo tracking columns from the `recordPayload`.** Columns `certyo_record_hash`, `certyo_anchor_status`, and `certyo_verified_at` are metadata. Including them causes hash mismatches on verification when those columns change.

7. **Filter out tracking-only updates in triggers.** Use the `<=>` (null-safe equality) operator to compare business columns. This prevents the verification worker's write-back from creating an infinite trigger loop.

8. **Timestamp polling cannot detect deletes.** If delete tracking is required, use the trigger or Debezium approach. For prototyping or read-mostly tables, timestamp polling is the simplest option.

9. **Clean up the queue table periodically.** Processed rows should be deleted after 24 hours. Use MySQL's EVENT SCHEDULER or an external cron job: `DELETE FROM certyo_queue WHERE processed = TRUE AND created_at < NOW() - INTERVAL 24 HOUR`.

10. **Store credentials securely.** Use environment variables for API keys, AWS Secrets Manager for RDS passwords. Never hardcode in source files or commit to version control. For MySQL SSL, download the provider's CA certificate and set `ssl_ca` in the connection config.
