---
name: Certyo + Oracle Database Integration
version: 1.0.0
description: Generate Oracle Database → Certyo integration code with ORDS, AQ, and Debezium LogMiner patterns, field mappings, auth, and working examples
api_base: https://www.certyos.com
auth: X-API-Key header
last_updated: 2026-04-14
---

# Certyo + Oracle Database Integration Skill

This skill generates production-ready Python, PL/SQL, and Java code to integrate Oracle Database with Certyo's blockchain-backed authenticity platform. It supports three change detection approaches: ORDS REST polling, Advanced Queuing (AQ) with triggers, and Debezium with LogMiner CDC.

## Certyo API Reference

All requests require the `X-API-Key` header for authentication.

### Endpoints

| Method | Path | Description | Response |
|--------|------|-------------|----------|
| `POST` | `/api/v1/records` | Ingest a single record | `202 Accepted` |
| `POST` | `/api/v1/records/bulk` | Ingest up to 1000 records | `202 Accepted` |
| `POST` | `/api/v1/verify/record` | Verify blockchain anchoring | `200 OK` |
| `GET` | `/api/v1/records` | Query records | `200 OK` |

### Record Payload

```json
{
  "tenantId": "string (required)",
  "database": "string (required)",
  "collection": "string (required)",
  "recordId": "string (required)",
  "recordPayload": { "...any JSON (required)" },
  "clientId": "string (optional)",
  "recordVersion": "string (optional, default '1')",
  "operationType": "upsert|insert|update|delete (optional)",
  "sourceTimestamp": "ISO 8601 (optional)",
  "idempotencyKey": "string (optional)"
}
```

### Ingestion Response (202 Accepted)

```json
{
  "recordId": "PROD-42",
  "recordHash": "sha256:ab12cd34...",
  "tenantId": "acme-corp",
  "acceptedAt": "2026-04-14T10:30:00Z",
  "idempotencyReplayed": false
}
```

### Pipeline Timing

Records flow through: Kafka -> Accumulate (1000 records or 60s) -> Merkle tree -> IPFS pin -> Polygon anchor. Total anchoring latency is approximately 60-90 seconds after accumulation flush.

## Integration Patterns

### Approach 1: ORDS REST Polling

```
Oracle table with MODIFIED_AT timestamp column
  -> ORDS exposes the table as a REST endpoint
    -> External poller queries ORDS for rows WHERE MODIFIED_AT > last_checkpoint
      -> Maps rows to Certyo record payloads
        -> If batch > 10: POST /api/v1/records/bulk
        -> If batch <= 10: POST /api/v1/records (per record)
          -> Updates checkpoint to the latest MODIFIED_AT value
```

### Approach 2: Advanced Queuing (AQ)

```
Oracle table with AFTER INSERT/UPDATE/DELETE trigger
  -> Trigger calls ENQUEUE_CERTYO_CHANGE procedure
    -> Message enqueued to CERTYO_CHANGES_Q (transactional, exactly-once within DB)
      -> External consumer (Java or Python) dequeues messages
        -> Maps message payload to Certyo record
          -> POST /api/v1/records or /api/v1/records/bulk
            -> Commits dequeue transaction on success
```

### Approach 3: Debezium + LogMiner

```
Oracle redo logs
  -> Debezium Oracle connector reads via LogMiner
    -> Change events published to Kafka topic (oracle.SCHEMA.TABLE)
      -> Kafka consumer reads events
        -> Maps Debezium envelope to Certyo record
          -> POST /api/v1/records or /api/v1/records/bulk
            -> Commits Kafka offset on success
```

## Authentication

### Oracle Connection (Python oracledb thin mode)

```python
import oracledb

conn = oracledb.connect(
    user="certyo_app",
    password="changeme",
    dsn="oracle-host:1521/ORCLPDB1"
)
```

### Oracle Connection (JDBC)

```java
String jdbcUrl = "jdbc:oracle:thin:@//oracle-host:1521/ORCLPDB1";
Connection conn = DriverManager.getConnection(jdbcUrl, "certyo_app", "changeme");
```

### ORDS Base URL

```
https://oracle-host:8443/ords/myschema
```

For ORDS with OAuth2:

```python
import requests

token_resp = requests.post(
    "https://oracle-host:8443/ords/myschema/oauth/token",
    data={"grant_type": "client_credentials"},
    auth=("client_id", "client_secret"),
)
access_token = token_resp.json()["access_token"]
headers = {"Authorization": f"Bearer {access_token}"}
```

### Certyo API

```python
CERTYO_BASE_URL = "https://www.certyos.com"
headers = {
    "X-API-Key": "cty_xxxxxxxxxxxxxxxxxxxx",
    "Content-Type": "application/json",
}
```

Store the API key in Oracle Wallet, environment variables, or a secrets manager. Never commit credentials in PL/SQL source or version control.

## Field Mapping

| Oracle Field | Certyo Field | Notes |
|-------------|-------------|-------|
| `PRODUCT_ID` (PK) | `recordId` | Cast to string |
| `PRODUCTS` (table name) | `collection` | Uppercase Oracle convention |
| `MYSCHEMA` (schema/owner) | `database` | Schema name as database identifier |
| `ROWID` or `PK + SCN` | `idempotencyKey` | Unique per change event |
| `SYSTIMESTAMP` | `sourceTimestamp` | ISO 8601 format |
| `'insert'/'update'/'delete'` | `operationType` | Mapped from trigger or Debezium op |
| Full row as JSON | `recordPayload` | All business columns serialized |

### Debezium Operation Mapping

| Debezium `op` | Certyo `operationType` |
|--------------|----------------------|
| `c` (create) | `insert` |
| `u` (update) | `update` |
| `d` (delete) | `delete` |
| `r` (read/snapshot) | `upsert` |

### AQ Trigger Operation Mapping

| PL/SQL Predicate | Certyo `operationType` |
|-----------------|----------------------|
| `INSERTING` | `insert` |
| `UPDATING` | `update` |
| `DELETING` | `delete` |

## Code Examples

### 1. ORDS Setup — Enable Auto-REST and Add Tracking Columns

```sql
-- Step 1: Add a MODIFIED_AT column for change tracking
ALTER TABLE PRODUCTS ADD (
    MODIFIED_AT        TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
    VERSION_NUM        NUMBER DEFAULT 1 NOT NULL,
    CERTYO_RECORD_HASH VARCHAR2(128),
    CERTYO_STATUS      VARCHAR2(20) DEFAULT 'Pending',
    CERTYO_VERIFIED_AT TIMESTAMP
);

-- Step 2: Create a trigger to update MODIFIED_AT on every change
CREATE OR REPLACE TRIGGER TRG_PRODUCTS_MODIFIED
BEFORE INSERT OR UPDATE ON PRODUCTS
FOR EACH ROW
BEGIN
    :NEW.MODIFIED_AT := SYSTIMESTAMP;
    IF UPDATING THEN
        :NEW.VERSION_NUM := :OLD.VERSION_NUM + 1;
    END IF;
END;
/

-- Step 3: Create an index for efficient polling
CREATE INDEX IDX_PRODUCTS_MODIFIED ON PRODUCTS (MODIFIED_AT);

-- Step 4: Enable ORDS auto-REST on the table
BEGIN
    ORDS.ENABLE_OBJECT(
        p_enabled      => TRUE,
        p_schema       => 'MYSCHEMA',
        p_object       => 'PRODUCTS',
        p_object_type  => 'TABLE',
        p_object_alias => 'products'
    );
    COMMIT;
END;
/
```

### 2. Python ORDS Poller

```python
"""
Oracle ORDS polling worker for Certyo integration.
Install: pip install requests oracledb

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    ORDS_BASE_URL=https://oracle-host:8443/ords/myschema \
    python ords_poller.py
"""

import os
import time
import requests
from datetime import datetime, timezone

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
ORDS_BASE_URL = os.environ["ORDS_BASE_URL"]
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15"))
BULK_THRESHOLD = 10

last_modified = "1970-01-01T00:00:00Z"


def poll_ords_changes():
    global last_modified
    url = f"{ORDS_BASE_URL}/products/"
    params = {
        "q": f'{{"modified_at":{{"$gt":"{last_modified}"}}}}',
        "limit": 500,
    }
    resp = requests.get(url, params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("items", [])


def map_to_certyo_record(row):
    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": row.get("owner", "oracle"),
        "collection": "PRODUCTS",
        "recordId": str(row["product_id"]),
        "recordVersion": str(row.get("version_num", 1)),
        "operationType": "upsert",
        "recordPayload": {
            "product_id": row["product_id"],
            "sku": row.get("sku"),
            "name": row.get("product_name"),
            "category": row.get("category"),
            "price": row.get("price"),
            "batch_number": row.get("batch_number"),
            "manufactured_at": row.get("manufactured_at"),
        },
        "sourceTimestamp": row.get(
            "modified_at", datetime.now(timezone.utc).isoformat()
        ),
        "idempotencyKey": f"ords-{row['product_id']}-{row.get('modified_at', '')}",
    }


def ingest_single(record):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    resp = requests.post(
        f"{CERTYO_BASE_URL}/api/v1/records",
        json=record, headers=headers, timeout=30,
    )
    if resp.status_code == 202:
        return resp.json()
    else:
        print(f"  Failed: {resp.status_code} {resp.text}")
        return None


def ingest_bulk(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    for i in range(0, len(records), 1000):
        batch = records[i : i + 1000]
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/records/bulk",
            json={"tenantId": CERTYO_TENANT_ID, "records": batch},
            headers=headers, timeout=60,
        )
        if resp.status_code != 202:
            print(f"  Bulk failed: {resp.status_code} {resp.text}")


def poll_loop():
    global last_modified
    print(f"Starting ORDS poller (interval={POLL_INTERVAL}s)")
    while True:
        try:
            rows = poll_ords_changes()
            if rows:
                records = [map_to_certyo_record(r) for r in rows]
                print(f"Found {len(records)} changed rows")

                if len(records) > BULK_THRESHOLD:
                    ingest_bulk(records)
                else:
                    for rec in records:
                        ingest_single(rec)

                last_modified = max(
                    r.get("modified_at", last_modified) for r in rows
                )
                print(f"Checkpoint updated to {last_modified}")
        except Exception as e:
            print(f"Poll error: {e}")

        time.sleep(POLL_INTERVAL)


if __name__ == "__main__":
    poll_loop()
```

### 3. PL/SQL — Advanced Queuing Setup

```sql
-- Step 1: Create the message payload type
CREATE OR REPLACE TYPE CERTYO_CHANGE_MSG AS OBJECT (
    PRODUCT_ID   NUMBER,
    SKU          VARCHAR2(100),
    PRODUCT_NAME VARCHAR2(200),
    CATEGORY     VARCHAR2(100),
    PRICE        NUMBER(10,2),
    BATCH_NUMBER VARCHAR2(100),
    OP_TYPE      VARCHAR2(10),
    CHANGE_TS    TIMESTAMP
);
/

-- Step 2: Create the queue table and queue
BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table        => 'CERTYO_CHANGES_QT',
        queue_payload_type => 'CERTYO_CHANGE_MSG'
    );
    DBMS_AQADM.CREATE_QUEUE(
        queue_name  => 'CERTYO_CHANGES_Q',
        queue_table => 'CERTYO_CHANGES_QT'
    );
    DBMS_AQADM.START_QUEUE(
        queue_name => 'CERTYO_CHANGES_Q'
    );
END;
/

-- Step 3: Create the enqueue procedure
CREATE OR REPLACE PROCEDURE ENQUEUE_CERTYO_CHANGE(
    p_product_id   NUMBER,
    p_sku          VARCHAR2,
    p_name         VARCHAR2,
    p_category     VARCHAR2,
    p_price        NUMBER,
    p_batch_number VARCHAR2,
    p_op_type      VARCHAR2
) AS
    v_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
    v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    v_message_handle     RAW(16);
    v_message            CERTYO_CHANGE_MSG;
BEGIN
    v_message := CERTYO_CHANGE_MSG(
        p_product_id, p_sku, p_name, p_category,
        p_price, p_batch_number, p_op_type, SYSTIMESTAMP
    );
    DBMS_AQ.ENQUEUE(
        queue_name         => 'CERTYO_CHANGES_Q',
        enqueue_options    => v_enqueue_options,
        message_properties => v_message_properties,
        payload            => v_message,
        msgid              => v_message_handle
    );
END;
/

-- Step 4: Create the trigger
CREATE OR REPLACE TRIGGER TRG_PRODUCTS_AQ
AFTER INSERT OR UPDATE OR DELETE ON PRODUCTS
FOR EACH ROW
DECLARE
    v_op VARCHAR2(10);
BEGIN
    IF INSERTING THEN v_op := 'insert';
    ELSIF UPDATING THEN v_op := 'update';
    ELSIF DELETING THEN v_op := 'delete';
    END IF;

    IF DELETING THEN
        ENQUEUE_CERTYO_CHANGE(
            :OLD.PRODUCT_ID, :OLD.SKU, :OLD.PRODUCT_NAME,
            :OLD.CATEGORY, :OLD.PRICE, :OLD.BATCH_NUMBER, v_op
        );
    ELSE
        ENQUEUE_CERTYO_CHANGE(
            :NEW.PRODUCT_ID, :NEW.SKU, :NEW.PRODUCT_NAME,
            :NEW.CATEGORY, :NEW.PRICE, :NEW.BATCH_NUMBER, v_op
        );
    END IF;
END;
/
```

### 4. Java AQ Consumer

```java
// AqCertyoConsumer.java
// Requires: ojdbc11, aqapi, jackson-databind
// Usage: java -DCERTYO_API_KEY=xxx -DCERTYO_TENANT_ID=yyy AqCertyoConsumer

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.aq.AQDequeueOptions;
import oracle.jdbc.aq.AQMessage;

public class AqCertyoConsumer {

    private static final String CERTYO_BASE = "https://www.certyos.com";
    private static final String API_KEY = System.getProperty("CERTYO_API_KEY");
    private static final String TENANT_ID = System.getProperty("CERTYO_TENANT_ID");
    private static final int BULK_THRESHOLD = 10;

    public static void main(String[] args) throws Exception {
        String jdbcUrl = System.getProperty("ORACLE_JDBC_URL",
            "jdbc:oracle:thin:@//localhost:1521/ORCLPDB1");
        String user = System.getProperty("ORACLE_USER", "certyo_app");
        String pass = System.getProperty("ORACLE_PASS", "changeme");

        Connection conn = DriverManager.getConnection(jdbcUrl, user, pass);
        OracleConnection oraConn = conn.unwrap(OracleConnection.class);
        HttpClient httpClient = HttpClient.newHttpClient();

        System.out.println("AQ Consumer started. Listening on CERTYO_CHANGES_Q...");

        while (true) {
            try {
                List<String> batch = new ArrayList<>();

                AQDequeueOptions deqOpts = new AQDequeueOptions();
                deqOpts.setWait(5);
                deqOpts.setNavigationMode(
                    AQDequeueOptions.NavigationMode.FIRST_MESSAGE);

                for (int i = 0; i < 100; i++) {
                    try {
                        AQMessage msg = oraConn.dequeue(
                            "CERTYO_CHANGES_Q", deqOpts, "CERTYO_CHANGE_MSG");
                        if (msg == null) break;

                        Object[] attrs = msg.getObjectPayload().getAttributes();
                        String recordJson = buildCertyoRecord(attrs);
                        batch.add(recordJson);
                    } catch (Exception e) {
                        break;
                    }
                }

                if (batch.isEmpty()) continue;

                System.out.println("Dequeued " + batch.size() + " messages");

                if (batch.size() > BULK_THRESHOLD) {
                    ingestBulk(httpClient, batch);
                } else {
                    for (String record : batch) {
                        ingestSingle(httpClient, record);
                    }
                }

                conn.commit();

            } catch (Exception e) {
                System.err.println("Consumer error: " + e.getMessage());
                conn.rollback();
                Thread.sleep(5000);
            }
        }
    }

    private static String buildCertyoRecord(Object[] attrs) {
        String productId = String.valueOf(attrs[0]);
        String sku = String.valueOf(attrs[1]);
        String name = attrs[2] != null ? String.valueOf(attrs[2]) : null;
        String category = attrs[3] != null ? String.valueOf(attrs[3]) : null;
        String price = attrs[4] != null ? String.valueOf(attrs[4]) : null;
        String batchNum = attrs[5] != null ? String.valueOf(attrs[5]) : null;
        String opType = String.valueOf(attrs[6]);
        String ts = Instant.now().toString();

        return String.format("""
            {
              "tenantId": "%s",
              "database": "oracle",
              "collection": "PRODUCTS",
              "recordId": "%s",
              "operationType": "%s",
              "recordPayload": {
                "product_id": %s,
                "sku": "%s",
                "name": %s,
                "category": %s,
                "price": %s,
                "batch_number": %s
              },
              "sourceTimestamp": "%s",
              "idempotencyKey": "aq-%s-%s"
            }""",
            TENANT_ID, sku, opType, productId,
            sku,
            name != null ? "\"" + name + "\"" : "null",
            category != null ? "\"" + category + "\"" : "null",
            price != null ? price : "null",
            batchNum != null ? "\"" + batchNum + "\"" : "null",
            ts, sku, ts.substring(0, 13)
        );
    }

    private static void ingestSingle(HttpClient client, String recordJson)
            throws Exception {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(CERTYO_BASE + "/api/v1/records"))
            .header("X-API-Key", API_KEY)
            .header("Content-Type", "application/json")
            .POST(HttpRequest.BodyPublishers.ofString(recordJson))
            .build();

        HttpResponse<String> resp = client.send(request,
            HttpResponse.BodyHandlers.ofString());
        if (resp.statusCode() != 202) {
            System.err.println("Ingestion failed: " + resp.statusCode()
                + " " + resp.body());
        }
    }

    private static void ingestBulk(HttpClient client, List<String> records)
            throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("{\"tenantId\":\"").append(TENANT_ID)
          .append("\",\"records\":[");
        for (int i = 0; i < records.size(); i++) {
            if (i > 0) sb.append(",");
            sb.append(records.get(i));
        }
        sb.append("]}");

        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(CERTYO_BASE + "/api/v1/records/bulk"))
            .header("X-API-Key", API_KEY)
            .header("Content-Type", "application/json")
            .POST(HttpRequest.BodyPublishers.ofString(sb.toString()))
            .build();

        HttpResponse<String> resp = client.send(request,
            HttpResponse.BodyHandlers.ofString());
        if (resp.statusCode() != 202) {
            System.err.println("Bulk ingestion failed: " + resp.statusCode()
                + " " + resp.body());
        }
    }
}
```

### 5. Debezium Docker Compose (Oracle + LogMiner)

```yaml
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.6
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: certyo-oracle-cdc
      CONFIG_STORAGE_TOPIC: _connect_configs
      OFFSET_STORAGE_TOPIC: _connect_offsets
      STATUS_STORAGE_TOPIC: _connect_statuses
```

Register the connector:

```bash
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" -d '{
  "name": "oracle-certyo-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "database.hostname": "oracle-host",
    "database.port": "1521",
    "database.user": "c##dbzuser",
    "database.password": "dbz",
    "database.dbname": "ORCLCDB",
    "database.pdb.name": "ORCLPDB1",
    "schema.include.list": "MYSCHEMA",
    "table.include.list": "MYSCHEMA.PRODUCTS",
    "topic.prefix": "oracle",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.oracle",
    "log.mining.strategy": "online_catalog",
    "log.mining.continuous.mine": "true",
    "decimal.handling.mode": "double",
    "snapshot.mode": "initial"
  }
}'
```

### 6. Python Debezium Kafka Consumer

```python
"""
Kafka consumer for Debezium Oracle CDC events.
Install: pip install confluent-kafka requests

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    KAFKA_BOOTSTRAP=localhost:9092 \
    python debezium_consumer.py
"""

import os
import json
import requests
from confluent_kafka import Consumer, KafkaError

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "localhost:9092")
TOPIC = "oracle.MYSCHEMA.PRODUCTS"
BULK_THRESHOLD = 10


def create_consumer():
    return Consumer({
        "bootstrap.servers": KAFKA_BOOTSTRAP,
        "group.id": "certyo-oracle-consumer",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
    })


def map_debezium_to_certyo(event):
    payload = event.get("payload", {})
    after = payload.get("after", {})
    before = payload.get("before", {})
    source = payload.get("source", {})

    op_map = {"c": "insert", "u": "update", "d": "delete", "r": "upsert"}
    op = op_map.get(payload.get("op", ""), "upsert")

    data = after if after else before

    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": source.get("schema", "oracle"),
        "collection": source.get("table", "PRODUCTS"),
        "recordId": str(data.get("PRODUCT_ID", "")),
        "operationType": op,
        "recordPayload": {
            "product_id": data.get("PRODUCT_ID"),
            "sku": data.get("SKU"),
            "name": data.get("PRODUCT_NAME"),
            "category": data.get("CATEGORY"),
            "price": data.get("PRICE"),
            "batch_number": data.get("BATCH_NUMBER"),
        },
        "sourceTimestamp": source.get("ts_ms")
            and f"{source['ts_ms'] // 1000}"
            or None,
        "idempotencyKey": (
            f"dbz-{source.get('scn', '')}-"
            f"{data.get('PRODUCT_ID', '')}"
        ),
    }


def ingest_records(records):
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    if len(records) > BULK_THRESHOLD:
        for i in range(0, len(records), 1000):
            batch = records[i : i + 1000]
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records/bulk",
                json={"tenantId": CERTYO_TENANT_ID, "records": batch},
                headers=headers, timeout=60,
            )
            if resp.status_code != 202:
                print(f"Bulk failed: {resp.status_code} {resp.text}")
    else:
        for rec in records:
            resp = requests.post(
                f"{CERTYO_BASE_URL}/api/v1/records",
                json=rec, headers=headers, timeout=30,
            )
            if resp.status_code != 202:
                print(f"Failed: {resp.status_code} {resp.text}")


def main():
    consumer = create_consumer()
    consumer.subscribe([TOPIC])
    print(f"Listening on {TOPIC}...")

    batch = []
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            if batch:
                ingest_records(batch)
                consumer.commit()
                batch = []
            continue
        if msg.error():
            if msg.error().code() != KafkaError._PARTITION_EOF:
                print(f"Kafka error: {msg.error()}")
            continue

        event = json.loads(msg.value().decode("utf-8"))
        record = map_debezium_to_certyo(event)
        batch.append(record)

        if len(batch) >= 100:
            ingest_records(batch)
            consumer.commit()
            batch = []


if __name__ == "__main__":
    main()
```

## Verification & Write-back

### Verification Flow

1. Verification worker runs every 2 minutes
2. Queries `PRODUCTS` rows where `CERTYO_STATUS = 'Pending'` and `MODIFIED_AT` is older than 2 minutes
3. Calls `POST /api/v1/verify/record` with `tenantId`, `recordId` (PRODUCT_ID), and `recordHash`
4. On success, updates the source row:
   - `CERTYO_STATUS` = `'Anchored'`
   - `CERTYO_VERIFIED_AT` = `SYSTIMESTAMP`

### Write-back SQL

```sql
-- After ingestion: store the record hash
UPDATE PRODUCTS
SET CERTYO_RECORD_HASH = :record_hash,
    CERTYO_STATUS      = 'Pending'
WHERE PRODUCT_ID = :product_id;

-- After verification: mark as anchored
UPDATE PRODUCTS
SET CERTYO_STATUS      = 'Anchored',
    CERTYO_VERIFIED_AT = SYSTIMESTAMP
WHERE PRODUCT_ID = :product_id;
```

### AQ Trigger and Write-back Interaction

When the verification worker updates `CERTYO_STATUS` and `CERTYO_VERIFIED_AT`, the AQ trigger will fire and enqueue a change message. The consumer should filter out changes where only Certyo tracking columns were modified to avoid infinite loops. Check if the business columns actually changed before enqueuing:

```sql
-- Modified trigger with tracking column filter
CREATE OR REPLACE TRIGGER TRG_PRODUCTS_AQ
AFTER INSERT OR UPDATE OR DELETE ON PRODUCTS
FOR EACH ROW
DECLARE
    v_op VARCHAR2(10);
BEGIN
    -- Skip if only Certyo tracking columns changed
    IF UPDATING AND
       :OLD.SKU = :NEW.SKU AND
       :OLD.PRODUCT_NAME = :NEW.PRODUCT_NAME AND
       :OLD.CATEGORY = :NEW.CATEGORY AND
       NVL(:OLD.PRICE, -1) = NVL(:NEW.PRICE, -1) AND
       NVL(:OLD.BATCH_NUMBER, 'X') = NVL(:NEW.BATCH_NUMBER, 'X')
    THEN
        RETURN;  -- Only tracking columns changed, skip
    END IF;

    IF INSERTING THEN v_op := 'insert';
    ELSIF UPDATING THEN v_op := 'update';
    ELSIF DELETING THEN v_op := 'delete';
    END IF;

    IF DELETING THEN
        ENQUEUE_CERTYO_CHANGE(
            :OLD.PRODUCT_ID, :OLD.SKU, :OLD.PRODUCT_NAME,
            :OLD.CATEGORY, :OLD.PRICE, :OLD.BATCH_NUMBER, v_op
        );
    ELSE
        ENQUEUE_CERTYO_CHANGE(
            :NEW.PRODUCT_ID, :NEW.SKU, :NEW.PRODUCT_NAME,
            :NEW.CATEGORY, :NEW.PRICE, :NEW.BATCH_NUMBER, v_op
        );
    END IF;
END;
/
```

## Code Generation Rules

1. **Use the ORDS approach for simplicity.** It requires only a timestamp column and an external poller. No triggers, no Kafka infrastructure. Best for tables with < 10,000 changes per hour.

2. **Use Advanced Queuing for transactional reliability.** AQ enqueue is part of the DML transaction — if the INSERT/UPDATE rolls back, the message is never enqueued. Best when exactly-once semantics within the database are critical.

3. **Use Debezium + LogMiner for production CDC.** LogMiner reads redo logs asynchronously without triggers or polling, supporting millions of changes per hour. Requires Enterprise Edition and DBA-level grants.

4. **Use the bulk endpoint (`POST /api/v1/records/bulk`) when the batch contains more than 10 records.** The bulk endpoint accepts up to 1000 records per call. For batches exceeding 1000, split into multiple bulk calls.

5. **Always include an `idempotencyKey`.** For ORDS: `ords-{PRODUCT_ID}-{MODIFIED_AT}`. For AQ: `aq-{SKU}-{timestamp_hour}`. For Debezium: `dbz-{SCN}-{PRODUCT_ID}`. Certyo deduplicates and returns `idempotencyReplayed: true`.

6. **Implement exponential backoff on HTTP 429 (Too Many Requests).** Start with 1-second delay, double on each retry up to 3 retries. Never retry immediately on 429.

7. **Exclude Certyo tracking columns from the `recordPayload`.** Columns `CERTYO_RECORD_HASH`, `CERTYO_STATUS`, and `CERTYO_VERIFIED_AT` are metadata. Including them causes hash mismatches on verification when those columns change.

8. **Filter out tracking-only updates in triggers.** When the verification worker updates `CERTYO_STATUS`, the trigger fires again. Compare business columns in the trigger to avoid infinite enqueue loops.

9. **Oracle column names are uppercase.** The Debezium Oracle connector preserves Oracle's uppercase naming. Map `PRODUCT_ID` (not `product_id`) when consuming Debezium events.

10. **Store credentials securely.** Use Oracle Wallet for database passwords, environment variables for API keys. Never embed credentials in PL/SQL packages, triggers, or Java source files committed to version control.
