Skip to main contentCertyo Developer Portal

Oracle Database Integration

Detect row-level changes in Oracle Database and push them to Certyo for blockchain anchoring. This guide covers three approaches: Oracle REST Data Services (ORDS) polling, Advanced Queuing (AQ) with triggers, and Debezium with LogMiner CDC.

DBA privileges may be required
The Debezium Oracle connector uses LogMiner, which requires SELECT on V$LOG, V$LOGMNR_CONTENTS, and EXECUTE on DBMS_LOGMNR. ORDS and AQ approaches require DBA-level grants to configure. Coordinate with your Oracle DBA before proceeding.

Overview

Oracle Database does not expose a native CDC API like SQL Server. Instead, you have three main strategies for detecting changes and ingesting them into Certyo:

  • ORDS (REST Data Services) — Expose tables as REST endpoints. An external poller reads changes via a timestamp or sequence column and calls the Certyo API.
  • Advanced Queuing (AQ) — A database trigger enqueues a message on every row change. A consumer (Java or Python) dequeues and calls Certyo.
  • Debezium + LogMiner — The Debezium Oracle connector reads redo logs via LogMiner, streams changes to Kafka, and a consumer forwards them to Certyo.

Architecture

Data flow — three approachesbash
Approach 1: ORDS Polling
┌──────────────────────┐       ORDS REST API              ┌───────────────────┐
│  Oracle Database     │ ◄──── GET /ords/schema/products ──│  External Poller  │
│  PRODUCTS table      │                                   │  (Python/Node)    │
│  (MODIFIED_AT col)   │                                   └────────┬──────────┘
└──────────────────────┘                                            │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └─────────────────┘

Approach 2: Advanced Queuing (AQ)
┌──────────────────────┐  AFTER INSERT/UPDATE trigger      ┌───────────────────┐
│  Oracle Database     │ ──── DBMS_AQ.ENQUEUE ──────────> │  AQ Queue         │
│  PRODUCTS table      │                                   │  CERTYO_CHANGES_Q │
└──────────────────────┘                                   └────────┬──────────┘
                                                      DBMS_AQ.DEQUEUE
                                                      (Java/Python consumer)
                                                                    │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └─────────────────┘

Approach 3: Debezium + LogMiner
┌──────────────────────┐       Oracle LogMiner             ┌───────────────────┐
│  Oracle Database     │ ──── redo log mining ───────────> │  Debezium         │
│  PRODUCTS table      │                                   │  Oracle Connector │
└──────────────────────┘                                   └────────┬──────────┘
                                                                    │ Kafka
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Kafka Consumer  │
                                                           │  (Python/Java)   │
                                                           └────────┬────────┘
                                                                    │
                                                     POST /api/v1/records
                                                                    v
                                                           ┌─────────────────┐
                                                           │  Certyo API     │
                                                           │  (202 Accepted) │
                                                           └─────────────────┘

Prerequisites

  • Oracle Database 12c+ (Enterprise Edition recommended for LogMiner; Standard Edition works for ORDS and AQ)
  • Oracle REST Data Services (ORDS) installed and configured (for the ORDS approach)
  • Debezium 2.x with the Oracle connector (for the LogMiner approach)
  • A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
  • Python 3.9+ with oracledb (thin mode) or cx_Oracle, or Java 17+ for AQ consumers
  • For LogMiner: SELECT on V$LOG, V$LOGMNR_CONTENTS, V$DATABASE, V$THREAD, V$LOGFILE and EXECUTE on DBMS_LOGMNR

Approach 1: ORDS REST Polling

Oracle REST Data Services auto-rests tables as REST endpoints. You add a MODIFIED_AT timestamp column to your table, then an external poller periodically queries for rows modified since the last checkpoint.

ords_poller.py — Poll ORDS REST API and ingest into Certyopython
"""
Oracle ORDS polling worker for Certyo integration.
Install: pip install requests oracledb

Usage:
    CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
    ORDS_BASE_URL=https://oracle-host:8443/ords/myschema \
    python ords_poller.py
"""

import os
import time
import requests
from datetime import datetime, timezone

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
ORDS_BASE_URL = os.environ["ORDS_BASE_URL"]  # e.g. https://host:8443/ords/schema
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15"))
BULK_THRESHOLD = 10

# Track the last processed timestamp
last_modified = "1970-01-01T00:00:00Z"


def poll_ords_changes():
    """Query ORDS for rows modified since last checkpoint."""
    global last_modified
    url = f"{ORDS_BASE_URL}/products/"
    params = {
        "q": f'{{"modified_at":{{"$gt":"{last_modified}"}}}}',
        "limit": 500,
    }
    resp = requests.get(url, params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("items", [])


def map_to_certyo_record(row):
    """Map an ORDS row to a Certyo record payload."""
    return {
        "tenantId": CERTYO_TENANT_ID,
        "database": row.get("owner", "oracle"),
        "collection": "PRODUCTS",
        "recordId": str(row["product_id"]),
        "recordVersion": str(row.get("version_num", 1)),
        "operationType": "upsert",
        "recordPayload": {
            "product_id": row["product_id"],
            "sku": row.get("sku"),
            "name": row.get("product_name"),
            "category": row.get("category"),
            "price": row.get("price"),
            "batch_number": row.get("batch_number"),
            "manufactured_at": row.get("manufactured_at"),
        },
        "sourceTimestamp": row.get(
            "modified_at", datetime.now(timezone.utc).isoformat()
        ),
        "idempotencyKey": f"ords-{row['product_id']}-{row.get('modified_at', '')}",
    }


def ingest_single(record):
    """POST a single record to Certyo."""
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    resp = requests.post(
        f"{CERTYO_BASE_URL}/api/v1/records",
        json=record, headers=headers, timeout=30,
    )
    if resp.status_code == 202:
        return resp.json()
    else:
        print(f"  Failed: {resp.status_code} {resp.text}")
        return None


def ingest_bulk(records):
    """POST up to 1000 records via the bulk endpoint."""
    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    for i in range(0, len(records), 1000):
        batch = records[i : i + 1000]
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/records/bulk",
            json={"tenantId": CERTYO_TENANT_ID, "records": batch},
            headers=headers, timeout=60,
        )
        if resp.status_code != 202:
            print(f"  Bulk failed: {resp.status_code} {resp.text}")


def poll_loop():
    global last_modified
    print(f"Starting ORDS poller (interval={POLL_INTERVAL}s)")
    while True:
        try:
            rows = poll_ords_changes()
            if rows:
                records = [map_to_certyo_record(r) for r in rows]
                print(f"Found {len(records)} changed rows")

                if len(records) > BULK_THRESHOLD:
                    ingest_bulk(records)
                else:
                    for rec in records:
                        ingest_single(rec)

                # Update checkpoint to latest modified_at
                last_modified = max(
                    r.get("modified_at", last_modified) for r in rows
                )
                print(f"Checkpoint updated to {last_modified}")
        except Exception as e:
            print(f"Poll error: {e}")

        time.sleep(POLL_INTERVAL)


if __name__ == "__main__":
    poll_loop()

Approach 2: Advanced Queuing (AQ)

Oracle Advanced Queuing provides reliable, transactional message delivery within the database. A trigger enqueues a JSON message on every row change, and an external consumer dequeues and calls Certyo. This approach offers exactly-once semantics within the database transaction.

Create AQ queue and trigger for change capturebash
-- Step 1: Create the message payload type
CREATE OR REPLACE TYPE CERTYO_CHANGE_MSG AS OBJECT (
    PRODUCT_ID   NUMBER,
    SKU          VARCHAR2(100),
    PRODUCT_NAME VARCHAR2(200),
    CATEGORY     VARCHAR2(100),
    PRICE        NUMBER(10,2),
    BATCH_NUMBER VARCHAR2(100),
    OP_TYPE      VARCHAR2(10),
    CHANGE_TS    TIMESTAMP
);
/

-- Step 2: Create the queue table and queue
BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE(
        queue_table        => 'CERTYO_CHANGES_QT',
        queue_payload_type => 'CERTYO_CHANGE_MSG'
    );
    DBMS_AQADM.CREATE_QUEUE(
        queue_name  => 'CERTYO_CHANGES_Q',
        queue_table => 'CERTYO_CHANGES_QT'
    );
    DBMS_AQADM.START_QUEUE(
        queue_name => 'CERTYO_CHANGES_Q'
    );
END;
/

-- Step 3: Create the enqueue procedure
CREATE OR REPLACE PROCEDURE ENQUEUE_CERTYO_CHANGE(
    p_product_id   NUMBER,
    p_sku          VARCHAR2,
    p_name         VARCHAR2,
    p_category     VARCHAR2,
    p_price        NUMBER,
    p_batch_number VARCHAR2,
    p_op_type      VARCHAR2
) AS
    v_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
    v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    v_message_handle     RAW(16);
    v_message            CERTYO_CHANGE_MSG;
BEGIN
    v_message := CERTYO_CHANGE_MSG(
        p_product_id, p_sku, p_name, p_category,
        p_price, p_batch_number, p_op_type, SYSTIMESTAMP
    );
    DBMS_AQ.ENQUEUE(
        queue_name         => 'CERTYO_CHANGES_Q',
        enqueue_options    => v_enqueue_options,
        message_properties => v_message_properties,
        payload            => v_message,
        msgid              => v_message_handle
    );
END;
/

-- Step 4: Create the trigger that enqueues on row changes
CREATE OR REPLACE TRIGGER TRG_PRODUCTS_AQ
AFTER INSERT OR UPDATE OR DELETE ON PRODUCTS
FOR EACH ROW
DECLARE
    v_op VARCHAR2(10);
BEGIN
    IF INSERTING THEN v_op := 'insert';
    ELSIF UPDATING THEN v_op := 'update';
    ELSIF DELETING THEN v_op := 'delete';
    END IF;

    IF DELETING THEN
        ENQUEUE_CERTYO_CHANGE(
            :OLD.PRODUCT_ID, :OLD.SKU, :OLD.PRODUCT_NAME,
            :OLD.CATEGORY, :OLD.PRICE, :OLD.BATCH_NUMBER, v_op
        );
    ELSE
        ENQUEUE_CERTYO_CHANGE(
            :NEW.PRODUCT_ID, :NEW.SKU, :NEW.PRODUCT_NAME,
            :NEW.CATEGORY, :NEW.PRICE, :NEW.BATCH_NUMBER, v_op
        );
    END IF;
END;
/

Approach 3: Debezium + LogMiner

For production CDC without triggers, Debezium's Oracle connector reads redo logs via LogMiner. Changes stream to Kafka, where a consumer transforms and forwards them to Certyo. This is the most robust approach for high-volume tables.

docker-compose.yml — Debezium + Kafka + Oracle connectorbash
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.6
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: certyo-oracle-cdc
      CONFIG_STORAGE_TOPIC: _connect_configs
      OFFSET_STORAGE_TOPIC: _connect_offsets
      STATUS_STORAGE_TOPIC: _connect_statuses
      CONFIG_STORAGE_REPLICATION_FACTOR: 1
      OFFSET_STORAGE_REPLICATION_FACTOR: 1
      STATUS_STORAGE_REPLICATION_FACTOR: 1

# Register the Oracle connector after Kafka Connect starts:
#
# curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
#   "name": "oracle-certyo-connector",
#   "config": {
#     "connector.class": "io.debezium.connector.oracle.OracleConnector",
#     "database.hostname": "oracle-host",
#     "database.port": "1521",
#     "database.user": "c##dbzuser",
#     "database.password": "dbz",
#     "database.dbname": "ORCLCDB",
#     "database.pdb.name": "ORCLPDB1",
#     "schema.include.list": "MYSCHEMA",
#     "table.include.list": "MYSCHEMA.PRODUCTS",
#     "topic.prefix": "oracle",
#     "database.history.kafka.bootstrap.servers": "kafka:9092",
#     "database.history.kafka.topic": "schema-changes.oracle",
#     "log.mining.strategy": "online_catalog",
#     "log.mining.continuous.mine": "true",
#     "decimal.handling.mode": "double",
#     "snapshot.mode": "initial"
#   }
# }'

Field Mapping

Map Oracle columns to Certyo record fields. The mapping is consistent across all three approaches.

Oracle → Certyo field mappingbash
Oracle Field                  Certyo Field        Notes
─────────────────────────────────────────────────────────────────────────
PRODUCT_ID (PK)              recordId            Cast to string
PRODUCTS (table name)        collection          Uppercase Oracle convention
MYSCHEMA (schema/owner)      database            Schema name as database
ROWID or PK + SCN            idempotencyKey      Unique per change event
SYSDATE / SYSTIMESTAMP       sourceTimestamp      ISO 8601 format
'insert'/'update'/'delete'   operationType       Mapped from trigger or Debezium op
Full row as JSON             recordPayload       All business columns serialized

Debezium Operation Mapping:
  "c"  (create)  → "insert"
  "u"  (update)  → "update"
  "d"  (delete)  → "delete"
  "r"  (read/snapshot) → "upsert"

Verification and Write-back

After records are ingested, add tracking columns to the source table and periodically verify anchoring status via the Certyo API.

Add Certyo tracking columns and verifybash
-- Step 1: Add tracking columns to the source table
ALTER TABLE PRODUCTS ADD (
    CERTYO_RECORD_HASH   VARCHAR2(128),
    CERTYO_STATUS        VARCHAR2(20)  DEFAULT 'Pending',
    CERTYO_VERIFIED_AT   TIMESTAMP
);

-- Step 2: After ingestion, write back the record hash
UPDATE PRODUCTS
SET CERTYO_RECORD_HASH  = :record_hash,
    CERTYO_STATUS        = 'Pending'
WHERE PRODUCT_ID = :product_id;

-- Step 3: After verification confirms anchoring
UPDATE PRODUCTS
SET CERTYO_STATUS       = 'Anchored',
    CERTYO_VERIFIED_AT  = SYSTIMESTAMP
WHERE PRODUCT_ID = :product_id;

-- Step 4: Query unverified records for the verification worker
SELECT PRODUCT_ID, CERTYO_RECORD_HASH
FROM PRODUCTS
WHERE CERTYO_STATUS = 'Pending'
  AND CERTYO_RECORD_HASH IS NOT NULL
  AND MODIFIED_AT < SYSTIMESTAMP - INTERVAL '2' MINUTE;
verification_worker.py — Verify anchoring statuspython
"""
Verification worker: checks Certyo anchoring status and updates Oracle.
Install: pip install oracledb requests
"""

import os
import time
import oracledb
import requests

CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
ORACLE_DSN = os.environ.get("ORACLE_DSN", "localhost:1521/ORCLPDB1")
ORACLE_USER = os.environ.get("ORACLE_USER", "certyo_app")
ORACLE_PASS = os.environ.get("ORACLE_PASS", "changeme")


def verify_pending_records():
    conn = oracledb.connect(user=ORACLE_USER, password=ORACLE_PASS, dsn=ORACLE_DSN)
    cursor = conn.cursor()

    cursor.execute("""
        SELECT PRODUCT_ID, CERTYO_RECORD_HASH
        FROM PRODUCTS
        WHERE CERTYO_STATUS = 'Pending'
          AND CERTYO_RECORD_HASH IS NOT NULL
          AND MODIFIED_AT < SYSTIMESTAMP - INTERVAL '2' MINUTE
        FETCH FIRST 100 ROWS ONLY
    """)

    headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
    verified = 0
    failed = 0

    for product_id, record_hash in cursor.fetchall():
        resp = requests.post(
            f"{CERTYO_BASE_URL}/api/v1/verify/record",
            json={
                "tenantId": CERTYO_TENANT_ID,
                "database": "MYSCHEMA",
                "collection": "PRODUCTS",
                "recordId": str(product_id),
            },
            headers=headers, timeout=30,
        )

        if resp.status_code == 200:
            result = resp.json()
            if result.get("verified") and result.get("anchoredOnChain"):
                cursor.execute("""
                    UPDATE PRODUCTS
                    SET CERTYO_STATUS = 'Anchored',
                        CERTYO_VERIFIED_AT = SYSTIMESTAMP
                    WHERE PRODUCT_ID = :1
                """, [product_id])
                verified += 1
            else:
                failed += 1
        else:
            failed += 1

    conn.commit()
    cursor.close()
    conn.close()
    print(f"Verified: {verified}, Failed/Pending: {failed}")


if __name__ == "__main__":
    while True:
        try:
            verify_pending_records()
        except Exception as e:
            print(f"Verification error: {e}")
        time.sleep(120)

Authentication Reference

Oracle connection

The integration connects to Oracle using either oracledb (Python thin mode, no Oracle Client needed) or JDBC (Java). For ORDS polling, use the ORDS base URL with optional OAuth2 client credentials if ORDS authentication is enabled.

Certyo API key

Pass your Certyo API key in the X-API-Key header on all Certyo API calls. Store it in Oracle Wallet, environment variables, or a secrets manager — never in PL/SQL source code or version control.


Troubleshooting

  • ORDS returns 404 for the table — Verify auto-REST is enabled with SELECT * FROM USER_ORDS_ENABLED_OBJECTS. The schema must be ORDS-enabled first.
  • AQ dequeue returns no messages — Check the queue status with SELECT * FROM USER_QUEUE_TABLES. Ensure the queue is started and the trigger is firing (test with a manual INSERT).
  • Debezium LogMiner fails to start — Ensure the connector user has the required grants: SELECT on V$LOG, V$LOGMNR_CONTENTS and EXECUTE on DBMS_LOGMNR. The database must be in ARCHIVELOG mode.
  • Duplicate records in Certyo — Use idempotencyKey with a combination of the primary key and the SCN (for Debezium) or timestamp (for ORDS/AQ). Certyo deduplicates automatically and returns idempotencyReplayed: true.
  • Trigger slows down DML — AQ enqueue within a trigger is synchronous. For high-throughput tables, prefer the Debezium LogMiner approach, which reads redo logs asynchronously without affecting DML performance.

AI Integration · v1.0.0

AI Integration Skill

Download a skill file that enables AI agents to generate working Oracle Database + Certyo integration code for any language or framework.

v1.0.0
What is this?
A markdown file containing Oracle Database-specific field mappings, authentication setup, code examples, and integration patterns for Certyo. Drop it into your AI agent's context and ask it to generate integration code.

What's inside

  • AuthenticationOracle connection strings, ORDS setup, and AQ grants
  • ArchitectureThree approaches: ORDS polling, AQ triggers, Debezium LogMiner
  • Field mappingOracle columns and Debezium operations to Certyo record schema
  • Code examplesPython ORDS poller, PL/SQL AQ trigger, Java AQ consumer, Debezium Docker Compose
  • VerificationPeriodic anchoring verification with Oracle UPDATE write-back
  • Oracle patternsLogMiner configuration, AQ setup, ORDS auto-REST, timestamp tracking

How to use

Claude Code

Place the file in your project's .claude/commands/ directory, then use it as a slash command:

# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-oracledb.md \
  https://www.certyos.com/developers/skills/certyo-oracledb-skill.md

# Use it in Claude Code
/certyo-oracledb "Generate a Python worker that uses ORDS to detect Oracle changes and push them to Certyo"

Cursor / Copilot / Any AI Agent

Add the file to your project root or attach it to a conversation. The AI agent will use the Oracle Database-specific patterns, field mappings, and code examples to generate correct integration code.

# Add to your project
curl -o CERTYO_ORACLEDB.md \
  https://www.certyos.com/developers/skills/certyo-oracledb-skill.md

# Then in your AI agent:
"Using the Certyo Oracle Database spec in CERTYO_ORACLEDB.md,
 generate a python worker that uses ords to detect oracle changes and push them to certyo"

CLAUDE.md Context File

Append the skill file to your project's CLAUDE.md so every Claude conversation has Oracle Database + Certyo context automatically.

# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo Oracle Database Integration" >> CLAUDE.md
cat CERTYO_ORACLEDB.md >> CLAUDE.md