Microsoft SQL Server Integration
Use SQL Server Change Data Capture (CDC) to detect row-level changes and push them to Certyo for blockchain anchoring. This guide covers enabling CDC, building a .NET 8 worker that polls for changes, and optimizing with bulk ingestion.
Overview
Change Data Capture records insert, update, and delete activity on SQL Server tables. A background worker polls the CDC change tables at regular intervals, transforms the rows into Certyo records, and calls the ingestion API. For high-throughput tables, changes are accumulated and sent via the bulk endpoint.
Architecture
┌──────────────────────┐ CDC capture process ┌───────────────────┐
│ SQL Server │ ──── (automatic, async) ────────> │ CDC Change Tables │
│ Products table │ │ cdc.dbo_Products │
│ (INSERT/UPDATE/DEL) │ │ _CT │
└──────────────────────┘ └─────────┬─────────┘
│
.NET Worker polls every 10s
fn_cdc_get_net_changes_dbo_Products
│
v
┌─────────────────┐
│ .NET 8 Worker │
│ (BackgroundSvc)│
└────────┬────────┘
│
batch <= 10: POST /api/v1/records (per record)
batch > 10: POST /api/v1/records/bulk
│
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└────────┬────────┘
│
~60-90s anchoring
│
v
┌─────────────────┐
│ Polygon │
│ (on-chain) │
└─────────────────┘Prerequisites
- SQL Server 2016+ (Enterprise, Developer, or Standard) or Azure SQL Database (Hyperscale or General Purpose)
db_ownerorsysadminrole to enable CDC on the database and tables- .NET 8 SDK for the worker service (or Python 3.9+ for the alternative example)
- A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
- SQL Server Agent must be running (on-premises only — Azure SQL handles this automatically)
Step 1: Enable Change Data Capture
First enable CDC at the database level, then on each table you want to track. The following example enables CDC on a Products table.
-- Step 1: Enable CDC on the database
USE ProductCatalog;
GO
EXEC sys.sp_cdc_enable_db;
GO
-- Step 2: Enable CDC on the Products table
-- This creates the capture instance: cdc.dbo_Products_CT
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Products',
@role_name = NULL, -- No gating role (all db users can read CDC)
@supports_net_changes = 1; -- Enable net changes (collapsed per PK)
GO
-- Verify CDC is active
SELECT name, is_cdc_enabled
FROM sys.databases
WHERE name = 'ProductCatalog';
SELECT s.name AS schema_name, t.name AS table_name, t.is_tracked_by_cdc
FROM sys.tables t
JOIN sys.schemas s ON t.schema_id = s.schema_id
WHERE t.is_tracked_by_cdc = 1;-- Azure SQL: CDC is enabled the same way, but SQL Agent is managed for you.
-- No additional configuration needed for the capture/cleanup jobs.
USE ProductCatalog;
GO
EXEC sys.sp_cdc_enable_db;
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Products',
@role_name = NULL,
@supports_net_changes = 1;
GO
-- Azure SQL automatically creates and manages the CDC capture and cleanup jobs.
-- Default cleanup retention is 3 days. To change it:
EXEC sys.sp_cdc_change_job
@job_type = N'cleanup',
@retention = 4320; -- minutes (3 days = 4320)
GOStep 2: .NET 8 CDC Worker
The worker service polls CDC at a configurable interval (default: 10 seconds), reads net changes since the last processed LSN, transforms each row into a Certyo record, and calls the API. When the batch exceeds 10 changes, it uses the bulk endpoint for efficiency.
using System.Data;
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.Data.SqlClient;
namespace Certyo.CdcWorker;
public class CdcPollingWorker : BackgroundService
{
private readonly ILogger<CdcPollingWorker> _logger;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IConfiguration _config;
private byte[]? _lastProcessedLsn;
public CdcPollingWorker(
ILogger<CdcPollingWorker> logger,
IHttpClientFactory httpClientFactory,
IConfiguration config)
{
_logger = logger;
_httpClientFactory = httpClientFactory;
_config = config;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var pollInterval = TimeSpan.FromSeconds(
_config.GetValue("CdcPolling:IntervalSeconds", 10));
var connectionString = _config.GetConnectionString("SqlServer")!;
var tenantId = _config["Certyo:TenantId"]!;
var apiKey = _config["Certyo:ApiKey"]!;
// Load persisted LSN from a tracking table (or start from current max)
_lastProcessedLsn = await GetLastProcessedLsnAsync(connectionString);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await PollAndIngestAsync(connectionString, tenantId, apiKey, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "CDC polling cycle failed");
}
await Task.Delay(pollInterval, stoppingToken);
}
}
private async Task PollAndIngestAsync(
string connectionString, string tenantId, string apiKey,
CancellationToken ct)
{
await using var connection = new SqlConnection(connectionString);
await connection.OpenAsync(ct);
// Get the current maximum LSN
var maxLsn = await GetMaxLsnAsync(connection, ct);
if (maxLsn == null || (_lastProcessedLsn != null &&
CompareBytes(maxLsn, _lastProcessedLsn) <= 0))
{
return; // No new changes
}
// Determine the starting LSN
var fromLsn = _lastProcessedLsn != null
? await IncrementLsnAsync(connection, _lastProcessedLsn, ct)
: await GetMinLsnAsync(connection, "dbo_Products", ct);
if (fromLsn == null || CompareBytes(fromLsn, maxLsn) > 0)
return;
// Read net changes (collapsed by primary key)
var changes = new List<CdcChange>();
await using var cmd = connection.CreateCommand();
cmd.CommandText = @"
SELECT *
FROM cdc.fn_cdc_get_net_changes_dbo_Products(@from_lsn, @to_lsn, 'all with merge')
ORDER BY __$start_lsn";
cmd.Parameters.Add(new SqlParameter("@from_lsn", SqlDbType.Binary, 10)
{ Value = fromLsn });
cmd.Parameters.Add(new SqlParameter("@to_lsn", SqlDbType.Binary, 10)
{ Value = maxLsn });
await using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
var operation = reader.GetInt32(reader.GetOrdinal("__$operation"));
var operationType = operation switch
{
1 => "delete",
2 => "insert",
4 => "update", // net change: update (merge)
5 => "upsert", // merge: insert or update
_ => "upsert"
};
changes.Add(new CdcChange
{
ProductId = reader.GetInt64(reader.GetOrdinal("ProductId")),
Sku = reader.GetString(reader.GetOrdinal("Sku")),
Name = reader.IsDBNull(reader.GetOrdinal("Name"))
? null : reader.GetString(reader.GetOrdinal("Name")),
Category = reader.IsDBNull(reader.GetOrdinal("Category"))
? null : reader.GetString(reader.GetOrdinal("Category")),
Price = reader.IsDBNull(reader.GetOrdinal("Price"))
? null : reader.GetDecimal(reader.GetOrdinal("Price")),
ManufacturedAt = reader.IsDBNull(reader.GetOrdinal("ManufacturedAt"))
? null : reader.GetDateTime(reader.GetOrdinal("ManufacturedAt")),
BatchNumber = reader.IsDBNull(reader.GetOrdinal("BatchNumber"))
? null : reader.GetString(reader.GetOrdinal("BatchNumber")),
OperationType = operationType,
});
}
if (changes.Count == 0)
{
_lastProcessedLsn = maxLsn;
return;
}
_logger.LogInformation("Found {Count} CDC changes to ingest", changes.Count);
// Ingest into Certyo — bulk if > 10 records
var httpClient = _httpClientFactory.CreateClient("Certyo");
httpClient.DefaultRequestHeaders.Add("X-API-Key", apiKey);
if (changes.Count > 10)
{
await IngestBulkAsync(httpClient, tenantId, changes, ct);
}
else
{
foreach (var change in changes)
{
await IngestSingleAsync(httpClient, tenantId, change, ct);
}
}
// Persist the high-water mark
_lastProcessedLsn = maxLsn;
await SaveLastProcessedLsnAsync(connectionString, maxLsn);
}
private async Task IngestSingleAsync(
HttpClient httpClient, string tenantId, CdcChange change,
CancellationToken ct)
{
var record = MapToRecord(tenantId, change);
var response = await httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/records", record, ct);
if (!response.IsSuccessStatusCode)
{
var body = await response.Content.ReadAsStringAsync(ct);
_logger.LogWarning(
"Certyo ingestion failed for ProductId {Id}: {Status} {Body}",
change.ProductId, response.StatusCode, body);
}
}
private async Task IngestBulkAsync(
HttpClient httpClient, string tenantId, List<CdcChange> changes,
CancellationToken ct)
{
// Bulk endpoint accepts up to 1000 records per request
foreach (var batch in changes.Chunk(1000))
{
var bulkPayload = new
{
tenantId,
records = batch.Select(c => MapToRecord(tenantId, c)).ToArray()
};
var response = await httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/records/bulk", bulkPayload, ct);
if (!response.IsSuccessStatusCode)
{
var body = await response.Content.ReadAsStringAsync(ct);
_logger.LogWarning("Bulk ingestion failed: {Status} {Body}",
response.StatusCode, body);
}
}
}
private static object MapToRecord(string tenantId, CdcChange change)
{
return new
{
tenantId,
database = "ProductCatalog",
collection = "Products",
recordId = change.Sku,
recordVersion = change.ProductId.ToString(),
operationType = change.OperationType,
recordPayload = new
{
productId = change.ProductId,
sku = change.Sku,
name = change.Name,
category = change.Category,
price = change.Price,
manufacturedAt = change.ManufacturedAt?.ToString("O"),
batchNumber = change.BatchNumber,
},
sourceTimestamp = DateTime.UtcNow.ToString("O"),
idempotencyKey = $"cdc-{change.Sku}-{change.ProductId}-"
+ $"{DateTime.UtcNow:yyyy-MM-dd}",
};
}
// ── LSN helper methods ──
private static async Task<byte[]?> GetMaxLsnAsync(
SqlConnection conn, CancellationToken ct)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT sys.fn_cdc_get_max_lsn()";
var result = await cmd.ExecuteScalarAsync(ct);
return result as byte[];
}
private static async Task<byte[]?> GetMinLsnAsync(
SqlConnection conn, string captureInstance, CancellationToken ct)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"SELECT sys.fn_cdc_get_min_lsn('{captureInstance}')";
return await cmd.ExecuteScalarAsync(ct) as byte[];
}
private static async Task<byte[]?> IncrementLsnAsync(
SqlConnection conn, byte[] lsn, CancellationToken ct)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT sys.fn_cdc_increment_lsn(@lsn)";
cmd.Parameters.Add(new SqlParameter("@lsn", SqlDbType.Binary, 10)
{ Value = lsn });
return await cmd.ExecuteScalarAsync(ct) as byte[];
}
private async Task<byte[]?> GetLastProcessedLsnAsync(string connectionString)
{
await using var conn = new SqlConnection(connectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
IF OBJECT_ID('dbo.CdcTracker', 'U') IS NULL
BEGIN
CREATE TABLE dbo.CdcTracker (
Id INT PRIMARY KEY DEFAULT 1,
LastLsn VARBINARY(10) NOT NULL,
UpdatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END
SELECT LastLsn FROM dbo.CdcTracker WHERE Id = 1";
return await cmd.ExecuteScalarAsync() as byte[];
}
private static async Task SaveLastProcessedLsnAsync(
string connectionString, byte[] lsn)
{
await using var conn = new SqlConnection(connectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
MERGE dbo.CdcTracker AS target
USING (SELECT 1 AS Id) AS source ON target.Id = source.Id
WHEN MATCHED THEN
UPDATE SET LastLsn = @lsn, UpdatedAt = SYSUTCDATETIME()
WHEN NOT MATCHED THEN
INSERT (Id, LastLsn, UpdatedAt)
VALUES (1, @lsn, SYSUTCDATETIME());";
cmd.Parameters.Add(new SqlParameter("@lsn", SqlDbType.Binary, 10)
{ Value = lsn });
await cmd.ExecuteNonQueryAsync();
}
private static int CompareBytes(byte[] a, byte[] b)
{
for (int i = 0; i < Math.Min(a.Length, b.Length); i++)
{
if (a[i] != b[i]) return a[i].CompareTo(b[i]);
}
return a.Length.CompareTo(b.Length);
}
}
public record CdcChange
{
public long ProductId { get; init; }
public string Sku { get; init; } = "";
public string? Name { get; init; }
public string? Category { get; init; }
public decimal? Price { get; init; }
public DateTime? ManufacturedAt { get; init; }
public string? BatchNumber { get; init; }
public string OperationType { get; init; } = "upsert";
}using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Extensions.Sql;
using Microsoft.Extensions.Logging;
using System.Net.Http.Json;
namespace Certyo.CdcFunction;
/// <summary>
/// Azure Function triggered by SQL Server changes via the SQL bindings extension.
/// Requires Microsoft.Azure.Functions.Worker.Extensions.Sql NuGet package.
/// </summary>
public class CdcTriggerFunction
{
private readonly HttpClient _httpClient;
private readonly ILogger<CdcTriggerFunction> _logger;
public CdcTriggerFunction(
IHttpClientFactory httpClientFactory,
ILogger<CdcTriggerFunction> logger)
{
_httpClient = httpClientFactory.CreateClient("Certyo");
_httpClient.DefaultRequestHeaders.Add("X-API-Key",
Environment.GetEnvironmentVariable("CERTYO_API_KEY")!);
_logger = logger;
}
[Function("ProductChangeTrigger")]
public async Task Run(
[SqlTrigger("[dbo].[Products]", "SqlConnectionString")]
IReadOnlyList<SqlChange<Product>> changes)
{
var tenantId = Environment.GetEnvironmentVariable("CERTYO_TENANT_ID")!;
foreach (var change in changes)
{
var operationType = change.Operation switch
{
SqlChangeOperation.Insert => "insert",
SqlChangeOperation.Update => "update",
SqlChangeOperation.Delete => "delete",
_ => "upsert"
};
var record = new
{
tenantId,
database = "ProductCatalog",
collection = "Products",
recordId = change.Item.Sku,
recordVersion = change.Item.ProductId.ToString(),
operationType,
recordPayload = new
{
productId = change.Item.ProductId,
sku = change.Item.Sku,
name = change.Item.Name,
category = change.Item.Category,
price = change.Item.Price,
},
sourceTimestamp = DateTime.UtcNow.ToString("O"),
idempotencyKey = $"sqltrigger-{change.Item.Sku}-"
+ $"{DateTime.UtcNow:yyyy-MM-dd-HH}",
};
var response = await _httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/records", record);
if (response.IsSuccessStatusCode)
{
_logger.LogInformation("Ingested {Sku}", change.Item.Sku);
}
else
{
_logger.LogWarning("Failed to ingest {Sku}: {Status}",
change.Item.Sku, response.StatusCode);
}
}
}
}
public record Product
{
public long ProductId { get; init; }
public string Sku { get; init; } = "";
public string? Name { get; init; }
public string? Category { get; init; }
public decimal? Price { get; init; }
}"""
CDC polling worker for SQL Server using pyodbc + requests.
Install: pip install pyodbc requests
Usage:
CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python cdc_worker.py
"""
import os
import time
import json
import pyodbc
import requests
from datetime import datetime, timezone
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
SQL_CONNECTION_STRING = os.environ.get(
"SQL_CONNECTION_STRING",
"DRIVER={ODBC Driver 18 for SQL Server};"
"SERVER=localhost,1433;"
"DATABASE=ProductCatalog;"
"Trusted_Connection=yes;"
"TrustServerCertificate=yes;"
)
POLL_INTERVAL_SECONDS = int(os.environ.get("CDC_POLL_INTERVAL", "10"))
BULK_THRESHOLD = 10
def get_connection():
return pyodbc.connect(SQL_CONNECTION_STRING)
def get_max_lsn(cursor):
cursor.execute("SELECT sys.fn_cdc_get_max_lsn()")
row = cursor.fetchone()
return row[0] if row and row[0] else None
def get_min_lsn(cursor, capture_instance="dbo_Products"):
cursor.execute(f"SELECT sys.fn_cdc_get_min_lsn('{capture_instance}')")
row = cursor.fetchone()
return row[0] if row and row[0] else None
def increment_lsn(cursor, lsn):
cursor.execute("SELECT sys.fn_cdc_increment_lsn(?)", lsn)
row = cursor.fetchone()
return row[0] if row and row[0] else None
def load_last_lsn(cursor):
cursor.execute("""
IF OBJECT_ID('dbo.CdcTrackerPy', 'U') IS NULL
BEGIN
CREATE TABLE dbo.CdcTrackerPy (
Id INT PRIMARY KEY DEFAULT 1,
LastLsn VARBINARY(10) NOT NULL,
UpdatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END
SELECT LastLsn FROM dbo.CdcTrackerPy WHERE Id = 1
""")
row = cursor.fetchone()
return row[0] if row else None
def save_last_lsn(cursor, lsn):
cursor.execute("""
MERGE dbo.CdcTrackerPy AS target
USING (SELECT 1 AS Id) AS source ON target.Id = source.Id
WHEN MATCHED THEN
UPDATE SET LastLsn = ?, UpdatedAt = SYSUTCDATETIME()
WHEN NOT MATCHED THEN
INSERT (Id, LastLsn, UpdatedAt)
VALUES (1, ?, SYSUTCDATETIME());
""", lsn, lsn)
cursor.commit()
def map_to_record(row):
op_map = {1: "delete", 2: "insert", 4: "update", 5: "upsert"}
return {
"tenantId": CERTYO_TENANT_ID,
"database": "ProductCatalog",
"collection": "Products",
"recordId": row["Sku"],
"recordVersion": str(row["ProductId"]),
"operationType": op_map.get(row["__$operation"], "upsert"),
"recordPayload": {
"productId": row["ProductId"],
"sku": row["Sku"],
"name": row.get("Name"),
"category": row.get("Category"),
"price": float(row["Price"]) if row.get("Price") else None,
"batchNumber": row.get("BatchNumber"),
},
"sourceTimestamp": datetime.now(timezone.utc).isoformat(),
"idempotencyKey": (
f"cdc-py-{row['Sku']}-{row['ProductId']}-"
f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}"
),
}
def ingest_records(records):
headers = {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
}
if len(records) > BULK_THRESHOLD:
payload = {"tenantId": CERTYO_TENANT_ID, "records": records}
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
headers=headers, json=payload,
)
if resp.ok:
print(f"Bulk ingested {len(records)} records")
else:
print(f"Bulk ingestion failed: {resp.status_code} {resp.text}")
else:
for record in records:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
headers=headers, json=record,
)
if resp.ok:
result = resp.json()
print(f" Ingested {record['recordId']}: {result['recordHash']}")
else:
print(f" Failed {record['recordId']}: {resp.status_code}")
def poll_cycle():
conn = get_connection()
cursor = conn.cursor()
max_lsn = get_max_lsn(cursor)
if max_lsn is None:
return
last_lsn = load_last_lsn(cursor)
if last_lsn and max_lsn <= last_lsn:
return
from_lsn = increment_lsn(cursor, last_lsn) if last_lsn else get_min_lsn(cursor)
if from_lsn is None or from_lsn > max_lsn:
return
cursor.execute("""
SELECT *
FROM cdc.fn_cdc_get_net_changes_dbo_Products(?, ?, 'all with merge')
ORDER BY __$start_lsn
""", from_lsn, max_lsn)
columns = [desc[0] for desc in cursor.description]
changes = [dict(zip(columns, row)) for row in cursor.fetchall()]
if not changes:
save_last_lsn(cursor, max_lsn)
cursor.close()
conn.close()
return
print(f"Found {len(changes)} CDC changes")
records = [map_to_record(row) for row in changes]
ingest_records(records)
save_last_lsn(cursor, max_lsn)
cursor.close()
conn.close()
if __name__ == "__main__":
print(f"CDC worker started (poll every {POLL_INTERVAL_SECONDS}s)")
while True:
try:
poll_cycle()
except Exception as e:
print(f"Poll cycle error: {e}")
time.sleep(POLL_INTERVAL_SECONDS)Worker service registration
Register the worker in your Program.cs for the .NET 8 project:
using Certyo.CdcWorker;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHttpClient("Certyo", client =>
{
client.Timeout = TimeSpan.FromSeconds(30);
});
builder.Services.AddHostedService<CdcPollingWorker>();
var host = builder.Build();
host.Run();{
"ConnectionStrings": {
"SqlServer": "Server=localhost,1433;Database=ProductCatalog;Trusted_Connection=True;TrustServerCertificate=True;"
},
"Certyo": {
"TenantId": "your-tenant-id",
"ApiKey": "your-api-key"
},
"CdcPolling": {
"IntervalSeconds": 10
}
}CDC polling strategy
The worker uses a high-water mark pattern with Log Sequence Numbers (LSNs):
- On each poll cycle, call
sys.fn_cdc_get_max_lsn()to get the latest available LSN - Compare it to the last processed LSN (stored in the
CdcTrackertable). If they are equal, there are no new changes - Use
sys.fn_cdc_increment_lsn()to get the starting point for the next read — this avoids reprocessing the last batch - Read net changes with
fn_cdc_get_net_changes(collapses multiple operations per primary key into one row) using the'all with merge'filter - After successful ingestion, persist the new high-water mark LSN
Handling gaps and restarts
If the worker crashes, it resumes from the last persisted LSN. CDC retains change data for a configurable period (default: 3 days). If the worker is down longer than the retention window, the minimum LSN will have advanced past your saved position. Detect this by comparing your saved LSN against sys.fn_cdc_get_min_lsn() — if the min LSN is greater, you need a full table resync.
Bulk optimization
For tables with high write throughput, sending individual records is inefficient. The worker above automatically switches to the bulk endpoint when a poll cycle returns more than 10 changes:
POST /api/v1/records/bulkaccepts up to 1000 records per request- The
.Chunk(1000)call in the C# worker splits larger batches into 1000-record pages - Bulk responses include per-record status — check for partial failures and retry only the failed records
- For tables producing thousands of changes per poll cycle, consider reducing the poll interval to flush smaller batches more frequently
Periodic verification
Run a scheduled verification job (e.g., daily) that samples a percentage of anchored records and re-verifies them against the blockchain. This provides ongoing assurance that records have not been tampered with.
public async Task VerifySampleAsync(
HttpClient httpClient, string tenantId, string apiKey,
int sampleSize = 50, CancellationToken ct = default)
{
httpClient.DefaultRequestHeaders.Add("X-API-Key", apiKey);
// Query a random sample of recently anchored records
var queryUrl = $"https://www.certyos.com/api/v1/records" +
$"?tenantId={tenantId}&status=Anchored&limit={sampleSize}";
var queryResponse = await httpClient.GetFromJsonAsync<RecordQueryResult>(queryUrl, ct);
var verified = 0;
var failed = 0;
foreach (var record in queryResponse?.Items ?? [])
{
var verifyPayload = new
{
tenantId,
database = record.Database,
collection = record.Collection,
recordId = record.RecordId,
payload = record.RecordPayload,
};
var response = await httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/verify/record", verifyPayload, ct);
var result = await response.Content.ReadFromJsonAsync<VerifyResult>(
cancellationToken: ct);
if (result?.Verified == true && result.AnchoredOnChain)
verified++;
else
failed++;
}
Console.WriteLine($"Verification complete: {verified} passed, {failed} failed " +
$"out of {queryResponse?.Items?.Count ?? 0} sampled");
}Authentication reference
SQL Server connection
The worker connects to SQL Server using a standard ADO.NET connection string. Both Windows Authentication (Trusted_Connection=True) and SQL Authentication (User Id=...;Password=...) are supported. For Azure SQL, use Azure AD authentication with managed identities where possible.
Certyo API key
Pass your Certyo API key in the X-API-Key header on all Certyo API calls. The .NET worker reads it from configuration — in production, bind this to Azure Key Vault or a secrets manager rather than appsettings.json.
Troubleshooting
- CDC tables are empty — Verify SQL Server Agent is running (on-premises). For Azure SQL, CDC capture runs automatically but may have a short delay.
- "Invalid object name cdc.fn_cdc_get_net_changes" — Ensure CDC is enabled on both the database and the specific table. The function name includes the capture instance (e.g.,
cdc.fn_cdc_get_net_changes_dbo_Products). - LSN gap detected — Your worker was offline longer than the CDC retention period. Perform a full table snapshot to backfill, then resume CDC polling.
- Duplicate records in Certyo — The
idempotencyKeyincludes a date component. If you restart the worker on the same day, duplicates are deduplicated automatically. Across days, ensure your LSN tracking is persisted correctly.
AI Integration Skill
Download a skill file that enables AI agents to generate working SQL Server + Certyo integration code for any language or framework.
What's inside
- Authentication — SQL Server connection strings and CDC permissions setup
- Architecture — CDC capture tables → .NET 8 BackgroundService → Certyo API
- Field mapping — CDC operation codes and table columns to Certyo record schema
- Code examples — C# BackgroundService, Azure Function SQL trigger, Python with pyodbc
- Verification — Periodic sampling verification with SQL UPDATE write-back
- CDC patterns — LSN high-water mark tracking, bulk optimization, gap handling
How to use
Claude Code
Place the file in your project's .claude/commands/ directory, then use it as a slash command:
# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-sqlserver.md \
https://www.certyos.com/developers/skills/certyo-sqlserver-skill.md
# Use it in Claude Code
/certyo-sqlserver "Generate a .NET worker that uses CDC to push Product changes to Certyo"Cursor / Copilot / Any AI Agent
Add the file to your project root or attach it to a conversation. The AI agent will use the SQL Server-specific patterns, field mappings, and code examples to generate correct integration code.
# Add to your project
curl -o CERTYO_SQLSERVER.md \
https://www.certyos.com/developers/skills/certyo-sqlserver-skill.md
# Then in your AI agent:
"Using the Certyo SQL Server spec in CERTYO_SQLSERVER.md,
generate a .net worker that uses cdc to push product changes to certyo"CLAUDE.md Context File
Append the skill file to your project's CLAUDE.md so every Claude conversation has SQL Server + Certyo context automatically.
# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo SQL Server Integration" >> CLAUDE.md
cat CERTYO_SQLSERVER.md >> CLAUDE.md