Oracle Database Integration
Detect row-level changes in Oracle Database and push them to Certyo for blockchain anchoring. This guide covers three approaches: Oracle REST Data Services (ORDS) polling, Advanced Queuing (AQ) with triggers, and Debezium with LogMiner CDC.
Overview
Oracle Database does not expose a native CDC API like SQL Server. Instead, you have three main strategies for detecting changes and ingesting them into Certyo:
- ORDS (REST Data Services) — Expose tables as REST endpoints. An external poller reads changes via a timestamp or sequence column and calls the Certyo API.
- Advanced Queuing (AQ) — A database trigger enqueues a message on every row change. A consumer (Java or Python) dequeues and calls Certyo.
- Debezium + LogMiner — The Debezium Oracle connector reads redo logs via LogMiner, streams changes to Kafka, and a consumer forwards them to Certyo.
Architecture
Approach 1: ORDS Polling
┌──────────────────────┐ ORDS REST API ┌───────────────────┐
│ Oracle Database │ ◄──── GET /ords/schema/products ──│ External Poller │
│ PRODUCTS table │ │ (Python/Node) │
│ (MODIFIED_AT col) │ └────────┬──────────┘
└──────────────────────┘ │
POST /api/v1/records
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└─────────────────┘
Approach 2: Advanced Queuing (AQ)
┌──────────────────────┐ AFTER INSERT/UPDATE trigger ┌───────────────────┐
│ Oracle Database │ ──── DBMS_AQ.ENQUEUE ──────────> │ AQ Queue │
│ PRODUCTS table │ │ CERTYO_CHANGES_Q │
└──────────────────────┘ └────────┬──────────┘
DBMS_AQ.DEQUEUE
(Java/Python consumer)
│
POST /api/v1/records
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└─────────────────┘
Approach 3: Debezium + LogMiner
┌──────────────────────┐ Oracle LogMiner ┌───────────────────┐
│ Oracle Database │ ──── redo log mining ───────────> │ Debezium │
│ PRODUCTS table │ │ Oracle Connector │
└──────────────────────┘ └────────┬──────────┘
│ Kafka
v
┌─────────────────┐
│ Kafka Consumer │
│ (Python/Java) │
└────────┬────────┘
│
POST /api/v1/records
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└─────────────────┘Prerequisites
- Oracle Database 12c+ (Enterprise Edition recommended for LogMiner; Standard Edition works for ORDS and AQ)
- Oracle REST Data Services (ORDS) installed and configured (for the ORDS approach)
- Debezium 2.x with the Oracle connector (for the LogMiner approach)
- A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
- Python 3.9+ with
oracledb(thin mode) orcx_Oracle, or Java 17+ for AQ consumers - For LogMiner:
SELECTonV$LOG, V$LOGMNR_CONTENTS, V$DATABASE, V$THREAD, V$LOGFILEandEXECUTEonDBMS_LOGMNR
Approach 1: ORDS REST Polling
Oracle REST Data Services auto-rests tables as REST endpoints. You add a MODIFIED_AT timestamp column to your table, then an external poller periodically queries for rows modified since the last checkpoint.
"""
Oracle ORDS polling worker for Certyo integration.
Install: pip install requests oracledb
Usage:
CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
ORDS_BASE_URL=https://oracle-host:8443/ords/myschema \
python ords_poller.py
"""
import os
import time
import requests
from datetime import datetime, timezone
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
ORDS_BASE_URL = os.environ["ORDS_BASE_URL"] # e.g. https://host:8443/ords/schema
POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "15"))
BULK_THRESHOLD = 10
# Track the last processed timestamp
last_modified = "1970-01-01T00:00:00Z"
def poll_ords_changes():
"""Query ORDS for rows modified since last checkpoint."""
global last_modified
url = f"{ORDS_BASE_URL}/products/"
params = {
"q": f'{{"modified_at":{{"$gt":"{last_modified}"}}}}',
"limit": 500,
}
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json().get("items", [])
def map_to_certyo_record(row):
"""Map an ORDS row to a Certyo record payload."""
return {
"tenantId": CERTYO_TENANT_ID,
"database": row.get("owner", "oracle"),
"collection": "PRODUCTS",
"recordId": str(row["product_id"]),
"recordVersion": str(row.get("version_num", 1)),
"operationType": "upsert",
"recordPayload": {
"product_id": row["product_id"],
"sku": row.get("sku"),
"name": row.get("product_name"),
"category": row.get("category"),
"price": row.get("price"),
"batch_number": row.get("batch_number"),
"manufactured_at": row.get("manufactured_at"),
},
"sourceTimestamp": row.get(
"modified_at", datetime.now(timezone.utc).isoformat()
),
"idempotencyKey": f"ords-{row['product_id']}-{row.get('modified_at', '')}",
}
def ingest_single(record):
"""POST a single record to Certyo."""
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
json=record, headers=headers, timeout=30,
)
if resp.status_code == 202:
return resp.json()
else:
print(f" Failed: {resp.status_code} {resp.text}")
return None
def ingest_bulk(records):
"""POST up to 1000 records via the bulk endpoint."""
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
for i in range(0, len(records), 1000):
batch = records[i : i + 1000]
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
json={"tenantId": CERTYO_TENANT_ID, "records": batch},
headers=headers, timeout=60,
)
if resp.status_code != 202:
print(f" Bulk failed: {resp.status_code} {resp.text}")
def poll_loop():
global last_modified
print(f"Starting ORDS poller (interval={POLL_INTERVAL}s)")
while True:
try:
rows = poll_ords_changes()
if rows:
records = [map_to_certyo_record(r) for r in rows]
print(f"Found {len(records)} changed rows")
if len(records) > BULK_THRESHOLD:
ingest_bulk(records)
else:
for rec in records:
ingest_single(rec)
# Update checkpoint to latest modified_at
last_modified = max(
r.get("modified_at", last_modified) for r in rows
)
print(f"Checkpoint updated to {last_modified}")
except Exception as e:
print(f"Poll error: {e}")
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
poll_loop()-- Step 1: Add a MODIFIED_AT column for change tracking
ALTER TABLE PRODUCTS ADD (
MODIFIED_AT TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
VERSION_NUM NUMBER DEFAULT 1 NOT NULL
);
-- Step 2: Create a trigger to update MODIFIED_AT on every change
CREATE OR REPLACE TRIGGER TRG_PRODUCTS_MODIFIED
BEFORE INSERT OR UPDATE ON PRODUCTS
FOR EACH ROW
BEGIN
:NEW.MODIFIED_AT := SYSTIMESTAMP;
IF UPDATING THEN
:NEW.VERSION_NUM := :OLD.VERSION_NUM + 1;
END IF;
END;
/
-- Step 3: Create an index for efficient polling
CREATE INDEX IDX_PRODUCTS_MODIFIED ON PRODUCTS (MODIFIED_AT);
-- Step 4: Enable ORDS auto-REST on the table
BEGIN
ORDS.ENABLE_OBJECT(
p_enabled => TRUE,
p_schema => 'MYSCHEMA',
p_object => 'PRODUCTS',
p_object_type => 'TABLE',
p_object_alias => 'products'
);
COMMIT;
END;
/
-- Verify: GET https://oracle-host:8443/ords/myschema/products/
-- should return JSON with the table rows.Approach 2: Advanced Queuing (AQ)
Oracle Advanced Queuing provides reliable, transactional message delivery within the database. A trigger enqueues a JSON message on every row change, and an external consumer dequeues and calls Certyo. This approach offers exactly-once semantics within the database transaction.
-- Step 1: Create the message payload type
CREATE OR REPLACE TYPE CERTYO_CHANGE_MSG AS OBJECT (
PRODUCT_ID NUMBER,
SKU VARCHAR2(100),
PRODUCT_NAME VARCHAR2(200),
CATEGORY VARCHAR2(100),
PRICE NUMBER(10,2),
BATCH_NUMBER VARCHAR2(100),
OP_TYPE VARCHAR2(10),
CHANGE_TS TIMESTAMP
);
/
-- Step 2: Create the queue table and queue
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'CERTYO_CHANGES_QT',
queue_payload_type => 'CERTYO_CHANGE_MSG'
);
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'CERTYO_CHANGES_Q',
queue_table => 'CERTYO_CHANGES_QT'
);
DBMS_AQADM.START_QUEUE(
queue_name => 'CERTYO_CHANGES_Q'
);
END;
/
-- Step 3: Create the enqueue procedure
CREATE OR REPLACE PROCEDURE ENQUEUE_CERTYO_CHANGE(
p_product_id NUMBER,
p_sku VARCHAR2,
p_name VARCHAR2,
p_category VARCHAR2,
p_price NUMBER,
p_batch_number VARCHAR2,
p_op_type VARCHAR2
) AS
v_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
v_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
v_message CERTYO_CHANGE_MSG;
BEGIN
v_message := CERTYO_CHANGE_MSG(
p_product_id, p_sku, p_name, p_category,
p_price, p_batch_number, p_op_type, SYSTIMESTAMP
);
DBMS_AQ.ENQUEUE(
queue_name => 'CERTYO_CHANGES_Q',
enqueue_options => v_enqueue_options,
message_properties => v_message_properties,
payload => v_message,
msgid => v_message_handle
);
END;
/
-- Step 4: Create the trigger that enqueues on row changes
CREATE OR REPLACE TRIGGER TRG_PRODUCTS_AQ
AFTER INSERT OR UPDATE OR DELETE ON PRODUCTS
FOR EACH ROW
DECLARE
v_op VARCHAR2(10);
BEGIN
IF INSERTING THEN v_op := 'insert';
ELSIF UPDATING THEN v_op := 'update';
ELSIF DELETING THEN v_op := 'delete';
END IF;
IF DELETING THEN
ENQUEUE_CERTYO_CHANGE(
:OLD.PRODUCT_ID, :OLD.SKU, :OLD.PRODUCT_NAME,
:OLD.CATEGORY, :OLD.PRICE, :OLD.BATCH_NUMBER, v_op
);
ELSE
ENQUEUE_CERTYO_CHANGE(
:NEW.PRODUCT_ID, :NEW.SKU, :NEW.PRODUCT_NAME,
:NEW.CATEGORY, :NEW.PRICE, :NEW.BATCH_NUMBER, v_op
);
END IF;
END;
/// AqCertyoConsumer.java
// Requires: ojdbc11, aqapi, jackson-databind
// Usage: java -DCERTYO_API_KEY=xxx -DCERTYO_TENANT_ID=yyy AqCertyoConsumer
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.aq.AQDequeueOptions;
import oracle.jdbc.aq.AQMessage;
public class AqCertyoConsumer {
private static final String CERTYO_BASE = "https://www.certyos.com";
private static final String API_KEY = System.getProperty("CERTYO_API_KEY");
private static final String TENANT_ID = System.getProperty("CERTYO_TENANT_ID");
private static final int BULK_THRESHOLD = 10;
public static void main(String[] args) throws Exception {
String jdbcUrl = System.getProperty("ORACLE_JDBC_URL",
"jdbc:oracle:thin:@//localhost:1521/ORCLPDB1");
String user = System.getProperty("ORACLE_USER", "certyo_app");
String pass = System.getProperty("ORACLE_PASS", "changeme");
Connection conn = DriverManager.getConnection(jdbcUrl, user, pass);
OracleConnection oraConn = conn.unwrap(OracleConnection.class);
HttpClient httpClient = HttpClient.newHttpClient();
System.out.println("AQ Consumer started. Listening on CERTYO_CHANGES_Q...");
while (true) {
try {
List<String> batch = new ArrayList<>();
// Dequeue up to 100 messages with 5s wait
AQDequeueOptions deqOpts = new AQDequeueOptions();
deqOpts.setWait(5);
deqOpts.setNavigationMode(AQDequeueOptions.NavigationMode.FIRST_MESSAGE);
for (int i = 0; i < 100; i++) {
try {
AQMessage msg = oraConn.dequeue("CERTYO_CHANGES_Q", deqOpts,
"CERTYO_CHANGE_MSG");
if (msg == null) break;
// Extract payload fields from the Oracle object
Object[] attrs = msg.getObjectPayload().getAttributes();
String recordJson = buildCertyoRecord(attrs);
batch.add(recordJson);
} catch (Exception e) {
break; // No more messages or timeout
}
}
if (batch.isEmpty()) continue;
System.out.println("Dequeued " + batch.size() + " messages");
if (batch.size() > BULK_THRESHOLD) {
ingestBulk(httpClient, batch);
} else {
for (String record : batch) {
ingestSingle(httpClient, record);
}
}
conn.commit(); // Commit dequeue transaction
} catch (Exception e) {
System.err.println("Consumer error: " + e.getMessage());
conn.rollback();
Thread.sleep(5000);
}
}
}
private static String buildCertyoRecord(Object[] attrs) {
// attrs: [PRODUCT_ID, SKU, PRODUCT_NAME, CATEGORY, PRICE,
// BATCH_NUMBER, OP_TYPE, CHANGE_TS]
String productId = String.valueOf(attrs[0]);
String sku = String.valueOf(attrs[1]);
String name = attrs[2] != null ? String.valueOf(attrs[2]) : null;
String category = attrs[3] != null ? String.valueOf(attrs[3]) : null;
String price = attrs[4] != null ? String.valueOf(attrs[4]) : null;
String batchNum = attrs[5] != null ? String.valueOf(attrs[5]) : null;
String opType = String.valueOf(attrs[6]);
String ts = Instant.now().toString();
return String.format("""
{
"tenantId": "%s",
"database": "oracle",
"collection": "PRODUCTS",
"recordId": "%s",
"operationType": "%s",
"recordPayload": {
"product_id": %s,
"sku": "%s",
"name": %s,
"category": %s,
"price": %s,
"batch_number": %s
},
"sourceTimestamp": "%s",
"idempotencyKey": "aq-%s-%s"
}""",
TENANT_ID, sku, opType, productId,
sku,
name != null ? "\"" + name + "\"" : "null",
category != null ? "\"" + category + "\"" : "null",
price != null ? price : "null",
batchNum != null ? "\"" + batchNum + "\"" : "null",
ts, sku, ts.substring(0, 13)
);
}
private static void ingestSingle(HttpClient client, String recordJson)
throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(CERTYO_BASE + "/api/v1/records"))
.header("X-API-Key", API_KEY)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(recordJson))
.build();
HttpResponse<String> resp = client.send(request,
HttpResponse.BodyHandlers.ofString());
if (resp.statusCode() != 202) {
System.err.println("Ingestion failed: " + resp.statusCode()
+ " " + resp.body());
}
}
private static void ingestBulk(HttpClient client, List<String> records)
throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("{\"tenantId\":\"").append(TENANT_ID)
.append("\",\"records\":[");
for (int i = 0; i < records.size(); i++) {
if (i > 0) sb.append(",");
sb.append(records.get(i));
}
sb.append("]}");
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(CERTYO_BASE + "/api/v1/records/bulk"))
.header("X-API-Key", API_KEY)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(sb.toString()))
.build();
HttpResponse<String> resp = client.send(request,
HttpResponse.BodyHandlers.ofString());
if (resp.statusCode() != 202) {
System.err.println("Bulk ingestion failed: " + resp.statusCode()
+ " " + resp.body());
}
}
}Approach 3: Debezium + LogMiner
For production CDC without triggers, Debezium's Oracle connector reads redo logs via LogMiner. Changes stream to Kafka, where a consumer transforms and forwards them to Certyo. This is the most robust approach for high-volume tables.
version: "3.9"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.6
depends_on: [kafka]
ports: ["8083:8083"]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: certyo-oracle-cdc
CONFIG_STORAGE_TOPIC: _connect_configs
OFFSET_STORAGE_TOPIC: _connect_offsets
STATUS_STORAGE_TOPIC: _connect_statuses
CONFIG_STORAGE_REPLICATION_FACTOR: 1
OFFSET_STORAGE_REPLICATION_FACTOR: 1
STATUS_STORAGE_REPLICATION_FACTOR: 1
# Register the Oracle connector after Kafka Connect starts:
#
# curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
# "name": "oracle-certyo-connector",
# "config": {
# "connector.class": "io.debezium.connector.oracle.OracleConnector",
# "database.hostname": "oracle-host",
# "database.port": "1521",
# "database.user": "c##dbzuser",
# "database.password": "dbz",
# "database.dbname": "ORCLCDB",
# "database.pdb.name": "ORCLPDB1",
# "schema.include.list": "MYSCHEMA",
# "table.include.list": "MYSCHEMA.PRODUCTS",
# "topic.prefix": "oracle",
# "database.history.kafka.bootstrap.servers": "kafka:9092",
# "database.history.kafka.topic": "schema-changes.oracle",
# "log.mining.strategy": "online_catalog",
# "log.mining.continuous.mine": "true",
# "decimal.handling.mode": "double",
# "snapshot.mode": "initial"
# }
# }'"""
Kafka consumer for Debezium Oracle CDC events.
Install: pip install confluent-kafka requests
Usage:
CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy \
KAFKA_BOOTSTRAP=localhost:9092 \
python debezium_consumer.py
"""
import os
import json
import requests
from confluent_kafka import Consumer, KafkaError
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
KAFKA_BOOTSTRAP = os.environ.get("KAFKA_BOOTSTRAP", "localhost:9092")
TOPIC = "oracle.MYSCHEMA.PRODUCTS" # Debezium topic: prefix.schema.table
BULK_THRESHOLD = 10
def create_consumer():
return Consumer({
"bootstrap.servers": KAFKA_BOOTSTRAP,
"group.id": "certyo-oracle-consumer",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
def map_debezium_to_certyo(event):
"""Map a Debezium Oracle change event to a Certyo record."""
payload = event.get("payload", {})
after = payload.get("after", {})
before = payload.get("before", {})
source = payload.get("source", {})
op_map = {"c": "insert", "u": "update", "d": "delete", "r": "upsert"}
op = op_map.get(payload.get("op", ""), "upsert")
# Use 'after' for insert/update, 'before' for delete
data = after if after else before
return {
"tenantId": CERTYO_TENANT_ID,
"database": source.get("schema", "oracle"),
"collection": source.get("table", "PRODUCTS"),
"recordId": str(data.get("PRODUCT_ID", "")),
"operationType": op,
"recordPayload": {
"product_id": data.get("PRODUCT_ID"),
"sku": data.get("SKU"),
"name": data.get("PRODUCT_NAME"),
"category": data.get("CATEGORY"),
"price": data.get("PRICE"),
"batch_number": data.get("BATCH_NUMBER"),
},
"sourceTimestamp": source.get("ts_ms")
and f"{source['ts_ms'] // 1000}"
or None,
"idempotencyKey": (
f"dbz-{source.get('scn', '')}-"
f"{data.get('PRODUCT_ID', '')}"
),
}
def ingest_records(records):
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
if len(records) > BULK_THRESHOLD:
for i in range(0, len(records), 1000):
batch = records[i : i + 1000]
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
json={"tenantId": CERTYO_TENANT_ID, "records": batch},
headers=headers, timeout=60,
)
if resp.status_code != 202:
print(f"Bulk failed: {resp.status_code} {resp.text}")
else:
for rec in records:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
json=rec, headers=headers, timeout=30,
)
if resp.status_code != 202:
print(f"Failed: {resp.status_code} {resp.text}")
def main():
consumer = create_consumer()
consumer.subscribe([TOPIC])
print(f"Listening on {TOPIC}...")
batch = []
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
if batch:
ingest_records(batch)
consumer.commit()
batch = []
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
print(f"Kafka error: {msg.error()}")
continue
event = json.loads(msg.value().decode("utf-8"))
record = map_debezium_to_certyo(event)
batch.append(record)
if len(batch) >= 100:
ingest_records(batch)
consumer.commit()
batch = []
if __name__ == "__main__":
main()Field Mapping
Map Oracle columns to Certyo record fields. The mapping is consistent across all three approaches.
Oracle Field Certyo Field Notes
─────────────────────────────────────────────────────────────────────────
PRODUCT_ID (PK) recordId Cast to string
PRODUCTS (table name) collection Uppercase Oracle convention
MYSCHEMA (schema/owner) database Schema name as database
ROWID or PK + SCN idempotencyKey Unique per change event
SYSDATE / SYSTIMESTAMP sourceTimestamp ISO 8601 format
'insert'/'update'/'delete' operationType Mapped from trigger or Debezium op
Full row as JSON recordPayload All business columns serialized
Debezium Operation Mapping:
"c" (create) → "insert"
"u" (update) → "update"
"d" (delete) → "delete"
"r" (read/snapshot) → "upsert"Verification and Write-back
After records are ingested, add tracking columns to the source table and periodically verify anchoring status via the Certyo API.
-- Step 1: Add tracking columns to the source table
ALTER TABLE PRODUCTS ADD (
CERTYO_RECORD_HASH VARCHAR2(128),
CERTYO_STATUS VARCHAR2(20) DEFAULT 'Pending',
CERTYO_VERIFIED_AT TIMESTAMP
);
-- Step 2: After ingestion, write back the record hash
UPDATE PRODUCTS
SET CERTYO_RECORD_HASH = :record_hash,
CERTYO_STATUS = 'Pending'
WHERE PRODUCT_ID = :product_id;
-- Step 3: After verification confirms anchoring
UPDATE PRODUCTS
SET CERTYO_STATUS = 'Anchored',
CERTYO_VERIFIED_AT = SYSTIMESTAMP
WHERE PRODUCT_ID = :product_id;
-- Step 4: Query unverified records for the verification worker
SELECT PRODUCT_ID, CERTYO_RECORD_HASH
FROM PRODUCTS
WHERE CERTYO_STATUS = 'Pending'
AND CERTYO_RECORD_HASH IS NOT NULL
AND MODIFIED_AT < SYSTIMESTAMP - INTERVAL '2' MINUTE;"""
Verification worker: checks Certyo anchoring status and updates Oracle.
Install: pip install oracledb requests
"""
import os
import time
import oracledb
import requests
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
ORACLE_DSN = os.environ.get("ORACLE_DSN", "localhost:1521/ORCLPDB1")
ORACLE_USER = os.environ.get("ORACLE_USER", "certyo_app")
ORACLE_PASS = os.environ.get("ORACLE_PASS", "changeme")
def verify_pending_records():
conn = oracledb.connect(user=ORACLE_USER, password=ORACLE_PASS, dsn=ORACLE_DSN)
cursor = conn.cursor()
cursor.execute("""
SELECT PRODUCT_ID, CERTYO_RECORD_HASH
FROM PRODUCTS
WHERE CERTYO_STATUS = 'Pending'
AND CERTYO_RECORD_HASH IS NOT NULL
AND MODIFIED_AT < SYSTIMESTAMP - INTERVAL '2' MINUTE
FETCH FIRST 100 ROWS ONLY
""")
headers = {"X-API-Key": CERTYO_API_KEY, "Content-Type": "application/json"}
verified = 0
failed = 0
for product_id, record_hash in cursor.fetchall():
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/verify/record",
json={
"tenantId": CERTYO_TENANT_ID,
"database": "MYSCHEMA",
"collection": "PRODUCTS",
"recordId": str(product_id),
},
headers=headers, timeout=30,
)
if resp.status_code == 200:
result = resp.json()
if result.get("verified") and result.get("anchoredOnChain"):
cursor.execute("""
UPDATE PRODUCTS
SET CERTYO_STATUS = 'Anchored',
CERTYO_VERIFIED_AT = SYSTIMESTAMP
WHERE PRODUCT_ID = :1
""", [product_id])
verified += 1
else:
failed += 1
else:
failed += 1
conn.commit()
cursor.close()
conn.close()
print(f"Verified: {verified}, Failed/Pending: {failed}")
if __name__ == "__main__":
while True:
try:
verify_pending_records()
except Exception as e:
print(f"Verification error: {e}")
time.sleep(120)Authentication Reference
Oracle connection
The integration connects to Oracle using either oracledb (Python thin mode, no Oracle Client needed) or JDBC (Java). For ORDS polling, use the ORDS base URL with optional OAuth2 client credentials if ORDS authentication is enabled.
Certyo API key
Pass your Certyo API key in the X-API-Key header on all Certyo API calls. Store it in Oracle Wallet, environment variables, or a secrets manager — never in PL/SQL source code or version control.
Troubleshooting
- ORDS returns 404 for the table — Verify auto-REST is enabled with
SELECT * FROM USER_ORDS_ENABLED_OBJECTS. The schema must be ORDS-enabled first. - AQ dequeue returns no messages — Check the queue status with
SELECT * FROM USER_QUEUE_TABLES. Ensure the queue is started and the trigger is firing (test with a manual INSERT). - Debezium LogMiner fails to start — Ensure the connector user has the required grants:
SELECT on V$LOG, V$LOGMNR_CONTENTSandEXECUTE on DBMS_LOGMNR. The database must be in ARCHIVELOG mode. - Duplicate records in Certyo — Use
idempotencyKeywith a combination of the primary key and the SCN (for Debezium) or timestamp (for ORDS/AQ). Certyo deduplicates automatically and returnsidempotencyReplayed: true. - Trigger slows down DML — AQ enqueue within a trigger is synchronous. For high-throughput tables, prefer the Debezium LogMiner approach, which reads redo logs asynchronously without affecting DML performance.
AI Integration Skill
Download a skill file that enables AI agents to generate working Oracle Database + Certyo integration code for any language or framework.
What's inside
- Authentication — Oracle connection strings, ORDS setup, and AQ grants
- Architecture — Three approaches: ORDS polling, AQ triggers, Debezium LogMiner
- Field mapping — Oracle columns and Debezium operations to Certyo record schema
- Code examples — Python ORDS poller, PL/SQL AQ trigger, Java AQ consumer, Debezium Docker Compose
- Verification — Periodic anchoring verification with Oracle UPDATE write-back
- Oracle patterns — LogMiner configuration, AQ setup, ORDS auto-REST, timestamp tracking
How to use
Claude Code
Place the file in your project's .claude/commands/ directory, then use it as a slash command:
# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-oracledb.md \
https://www.certyos.com/developers/skills/certyo-oracledb-skill.md
# Use it in Claude Code
/certyo-oracledb "Generate a Python worker that uses ORDS to detect Oracle changes and push them to Certyo"Cursor / Copilot / Any AI Agent
Add the file to your project root or attach it to a conversation. The AI agent will use the Oracle Database-specific patterns, field mappings, and code examples to generate correct integration code.
# Add to your project
curl -o CERTYO_ORACLEDB.md \
https://www.certyos.com/developers/skills/certyo-oracledb-skill.md
# Then in your AI agent:
"Using the Certyo Oracle Database spec in CERTYO_ORACLEDB.md,
generate a python worker that uses ords to detect oracle changes and push them to certyo"CLAUDE.md Context File
Append the skill file to your project's CLAUDE.md so every Claude conversation has Oracle Database + Certyo context automatically.
# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo Oracle Database Integration" >> CLAUDE.md
cat CERTYO_ORACLEDB.md >> CLAUDE.md