Microsoft SQL Server Integration
Verwenden Sie SQL Server Change Data Capture (CDC), um Aenderungen auf Zeilenebene zu erkennen und an Certyo zur Blockchain-Verankerung zu senden. Dieser Leitfaden behandelt die Aktivierung von CDC, den Aufbau eines .NET 8 Workers, der Aenderungen abfragt, und die Optimierung mit Massenaufnahme.
Ueberblick
Change Data Capture records insert, update, and delete activity on SQL Server tables. A background worker polls the CDC change tables at regular intervals, transforms the rows into Certyo records, and calls the ingestion API. For high-throughput tables, changes are accumulated and sent via the bulk endpoint.
Architektur
┌──────────────────────┐ CDC capture process ┌───────────────────┐
│ SQL Server │ ──── (automatic, async) ────────> │ CDC Change Tables │
│ Products table │ │ cdc.dbo_Products │
│ (INSERT/UPDATE/DEL) │ │ _CT │
└──────────────────────┘ └─────────┬─────────┘
│
.NET Worker polls every 10s
fn_cdc_get_net_changes_dbo_Products
│
v
┌─────────────────┐
│ .NET 8 Worker │
│ (BackgroundSvc)│
└────────┬────────┘
│
batch <= 10: POST /api/v1/records (per record)
batch > 10: POST /api/v1/records/bulk
│
v
┌─────────────────┐
│ Certyo API │
│ (202 Accepted) │
└────────┬────────┘
│
~60-90s anchoring
│
v
┌─────────────────┐
│ Polygon │
│ (on-chain) │
└─────────────────┘Voraussetzungen
- SQL Server 2016+ (Enterprise, Developer, or Standard) or Azure SQL Database (Hyperscale or General Purpose)
db_ownerorsysadminrole to enable CDC on the database and tables- .NET 8 SDK for the worker service (or Python 3.9+ for the alternative example)
- A Certyo API key (obtain from your Certyo dashboard under Settings → API Keys)
- SQL Server Agent must be running (on-premises only — Azure SQL handles this automatically)
Schritt 1: Change Data Capture aktivieren
First enable CDC at the database level, then on each table you want to track. The following example enables CDC on a Products table.
-- Step 1: Enable CDC on the database
USE ProductCatalog;
GO
EXEC sys.sp_cdc_enable_db;
GO
-- Step 2: Enable CDC on the Products table
-- This creates the capture instance: cdc.dbo_Products_CT
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Products',
@role_name = NULL, -- No gating role (all db users can read CDC)
@supports_net_changes = 1; -- Enable net changes (collapsed per PK)
GO
-- Verify CDC is active
SELECT name, is_cdc_enabled
FROM sys.databases
WHERE name = 'ProductCatalog';
SELECT s.name AS schema_name, t.name AS table_name, t.is_tracked_by_cdc
FROM sys.tables t
JOIN sys.schemas s ON t.schema_id = s.schema_id
WHERE t.is_tracked_by_cdc = 1;-- Azure SQL: CDC is enabled the same way, but SQL Agent is managed for you.
-- No additional configuration needed for the capture/cleanup jobs.
USE ProductCatalog;
GO
EXEC sys.sp_cdc_enable_db;
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Products',
@role_name = NULL,
@supports_net_changes = 1;
GO
-- Azure SQL automatically creates and manages the CDC capture and cleanup jobs.
-- Default cleanup retention is 3 days. To change it:
EXEC sys.sp_cdc_change_job
@job_type = N'cleanup',
@retention = 4320; -- minutes (3 days = 4320)
GOSchritt 2: .NET 8 CDC Worker
The worker service polls CDC at a configurable interval (default: 10 seconds), reads net changes since the last processed LSN, transforms each row into a Certyo record, and calls the API. When the batch exceeds 10 changes, it uses the bulk endpoint for efficiency.
using System.Data;
using System.Net.Http.Json;
using System.Text.Json;
using Microsoft.Data.SqlClient;
namespace Certyo.CdcWorker;
public class CdcPollingWorker : BackgroundService
{
private readonly ILogger<CdcPollingWorker> _logger;
private readonly IHttpClientFactory _httpClientFactory;
private readonly IConfiguration _config;
private byte[]? _lastProcessedLsn;
public CdcPollingWorker(
ILogger<CdcPollingWorker> logger,
IHttpClientFactory httpClientFactory,
IConfiguration config)
{
_logger = logger;
_httpClientFactory = httpClientFactory;
_config = config;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var pollInterval = TimeSpan.FromSeconds(
_config.GetValue("CdcPolling:IntervalSeconds", 10));
var connectionString = _config.GetConnectionString("SqlServer")!;
var tenantId = _config["Certyo:TenantId"]!;
var apiKey = _config["Certyo:ApiKey"]!;
// Load persisted LSN from a tracking table (or start from current max)
_lastProcessedLsn = await GetLastProcessedLsnAsync(connectionString);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await PollAndIngestAsync(connectionString, tenantId, apiKey, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "CDC polling cycle failed");
}
await Task.Delay(pollInterval, stoppingToken);
}
}
private async Task PollAndIngestAsync(
string connectionString, string tenantId, string apiKey,
CancellationToken ct)
{
await using var connection = new SqlConnection(connectionString);
await connection.OpenAsync(ct);
// Get the current maximum LSN
var maxLsn = await GetMaxLsnAsync(connection, ct);
if (maxLsn == null || (_lastProcessedLsn != null &&
CompareBytes(maxLsn, _lastProcessedLsn) <= 0))
{
return; // No new changes
}
// Determine the starting LSN
var fromLsn = _lastProcessedLsn != null
? await IncrementLsnAsync(connection, _lastProcessedLsn, ct)
: await GetMinLsnAsync(connection, "dbo_Products", ct);
if (fromLsn == null || CompareBytes(fromLsn, maxLsn) > 0)
return;
// Read net changes (collapsed by primary key)
var changes = new List<CdcChange>();
await using var cmd = connection.CreateCommand();
cmd.CommandText = @"
SELECT *
FROM cdc.fn_cdc_get_net_changes_dbo_Products(@from_lsn, @to_lsn, 'all with merge')
ORDER BY __$start_lsn";
cmd.Parameters.Add(new SqlParameter("@from_lsn", SqlDbType.Binary, 10)
{ Value = fromLsn });
cmd.Parameters.Add(new SqlParameter("@to_lsn", SqlDbType.Binary, 10)
{ Value = maxLsn });
await using var reader = await cmd.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
var operation = reader.GetInt32(reader.GetOrdinal("__$operation"));
var operationType = operation switch
{
1 => "delete",
2 => "insert",
4 => "update", // net change: update (merge)
5 => "upsert", // merge: insert or update
_ => "upsert"
};
changes.Add(new CdcChange
{
ProductId = reader.GetInt64(reader.GetOrdinal("ProductId")),
Sku = reader.GetString(reader.GetOrdinal("Sku")),
Name = reader.IsDBNull(reader.GetOrdinal("Name"))
? null : reader.GetString(reader.GetOrdinal("Name")),
Category = reader.IsDBNull(reader.GetOrdinal("Category"))
? null : reader.GetString(reader.GetOrdinal("Category")),
Price = reader.IsDBNull(reader.GetOrdinal("Price"))
? null : reader.GetDecimal(reader.GetOrdinal("Price")),
ManufacturedAt = reader.IsDBNull(reader.GetOrdinal("ManufacturedAt"))
? null : reader.GetDateTime(reader.GetOrdinal("ManufacturedAt")),
BatchNumber = reader.IsDBNull(reader.GetOrdinal("BatchNumber"))
? null : reader.GetString(reader.GetOrdinal("BatchNumber")),
OperationType = operationType,
});
}
if (changes.Count == 0)
{
_lastProcessedLsn = maxLsn;
return;
}
_logger.LogInformation("Found {Count} CDC changes to ingest", changes.Count);
// Ingest into Certyo — bulk if > 10 records
var httpClient = _httpClientFactory.CreateClient("Certyo");
httpClient.DefaultRequestHeaders.Add("X-API-Key", apiKey);
if (changes.Count > 10)
{
await IngestBulkAsync(httpClient, tenantId, changes, ct);
}
else
{
foreach (var change in changes)
{
await IngestSingleAsync(httpClient, tenantId, change, ct);
}
}
// Persist the high-water mark
_lastProcessedLsn = maxLsn;
await SaveLastProcessedLsnAsync(connectionString, maxLsn);
}
private async Task IngestSingleAsync(
HttpClient httpClient, string tenantId, CdcChange change,
CancellationToken ct)
{
var record = MapToRecord(tenantId, change);
var response = await httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/records", record, ct);
if (!response.IsSuccessStatusCode)
{
var body = await response.Content.ReadAsStringAsync(ct);
_logger.LogWarning(
"Certyo ingestion failed for ProductId {Id}: {Status} {Body}",
change.ProductId, response.StatusCode, body);
}
}
private async Task IngestBulkAsync(
HttpClient httpClient, string tenantId, List<CdcChange> changes,
CancellationToken ct)
{
// Bulk endpoint accepts up to 1000 records per request
foreach (var batch in changes.Chunk(1000))
{
var bulkPayload = new
{
tenantId,
records = batch.Select(c => MapToRecord(tenantId, c)).ToArray()
};
var response = await httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/records/bulk", bulkPayload, ct);
if (!response.IsSuccessStatusCode)
{
var body = await response.Content.ReadAsStringAsync(ct);
_logger.LogWarning("Bulk ingestion failed: {Status} {Body}",
response.StatusCode, body);
}
}
}
private static object MapToRecord(string tenantId, CdcChange change)
{
return new
{
tenantId,
database = "ProductCatalog",
collection = "Products",
recordId = change.Sku,
recordVersion = change.ProductId.ToString(),
operationType = change.OperationType,
recordPayload = new
{
productId = change.ProductId,
sku = change.Sku,
name = change.Name,
category = change.Category,
price = change.Price,
manufacturedAt = change.ManufacturedAt?.ToString("O"),
batchNumber = change.BatchNumber,
},
sourceTimestamp = DateTime.UtcNow.ToString("O"),
idempotencyKey = $"cdc-{change.Sku}-{change.ProductId}-"
+ $"{DateTime.UtcNow:yyyy-MM-dd}",
};
}
// ── LSN helper methods ──
private static async Task<byte[]?> GetMaxLsnAsync(
SqlConnection conn, CancellationToken ct)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT sys.fn_cdc_get_max_lsn()";
var result = await cmd.ExecuteScalarAsync(ct);
return result as byte[];
}
private static async Task<byte[]?> GetMinLsnAsync(
SqlConnection conn, string captureInstance, CancellationToken ct)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = $"SELECT sys.fn_cdc_get_min_lsn('{captureInstance}')";
return await cmd.ExecuteScalarAsync(ct) as byte[];
}
private static async Task<byte[]?> IncrementLsnAsync(
SqlConnection conn, byte[] lsn, CancellationToken ct)
{
await using var cmd = conn.CreateCommand();
cmd.CommandText = "SELECT sys.fn_cdc_increment_lsn(@lsn)";
cmd.Parameters.Add(new SqlParameter("@lsn", SqlDbType.Binary, 10)
{ Value = lsn });
return await cmd.ExecuteScalarAsync(ct) as byte[];
}
private async Task<byte[]?> GetLastProcessedLsnAsync(string connectionString)
{
await using var conn = new SqlConnection(connectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
IF OBJECT_ID('dbo.CdcTracker', 'U') IS NULL
BEGIN
CREATE TABLE dbo.CdcTracker (
Id INT PRIMARY KEY DEFAULT 1,
LastLsn VARBINARY(10) NOT NULL,
UpdatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END
SELECT LastLsn FROM dbo.CdcTracker WHERE Id = 1";
return await cmd.ExecuteScalarAsync() as byte[];
}
private static async Task SaveLastProcessedLsnAsync(
string connectionString, byte[] lsn)
{
await using var conn = new SqlConnection(connectionString);
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = @"
MERGE dbo.CdcTracker AS target
USING (SELECT 1 AS Id) AS source ON target.Id = source.Id
WHEN MATCHED THEN
UPDATE SET LastLsn = @lsn, UpdatedAt = SYSUTCDATETIME()
WHEN NOT MATCHED THEN
INSERT (Id, LastLsn, UpdatedAt)
VALUES (1, @lsn, SYSUTCDATETIME());";
cmd.Parameters.Add(new SqlParameter("@lsn", SqlDbType.Binary, 10)
{ Value = lsn });
await cmd.ExecuteNonQueryAsync();
}
private static int CompareBytes(byte[] a, byte[] b)
{
for (int i = 0; i < Math.Min(a.Length, b.Length); i++)
{
if (a[i] != b[i]) return a[i].CompareTo(b[i]);
}
return a.Length.CompareTo(b.Length);
}
}
public record CdcChange
{
public long ProductId { get; init; }
public string Sku { get; init; } = "";
public string? Name { get; init; }
public string? Category { get; init; }
public decimal? Price { get; init; }
public DateTime? ManufacturedAt { get; init; }
public string? BatchNumber { get; init; }
public string OperationType { get; init; } = "upsert";
}using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Extensions.Sql;
using Microsoft.Extensions.Logging;
using System.Net.Http.Json;
namespace Certyo.CdcFunction;
/// <summary>
/// Azure Function triggered by SQL Server changes via the SQL bindings extension.
/// Requires Microsoft.Azure.Functions.Worker.Extensions.Sql NuGet package.
/// </summary>
public class CdcTriggerFunction
{
private readonly HttpClient _httpClient;
private readonly ILogger<CdcTriggerFunction> _logger;
public CdcTriggerFunction(
IHttpClientFactory httpClientFactory,
ILogger<CdcTriggerFunction> logger)
{
_httpClient = httpClientFactory.CreateClient("Certyo");
_httpClient.DefaultRequestHeaders.Add("X-API-Key",
Environment.GetEnvironmentVariable("CERTYO_API_KEY")!);
_logger = logger;
}
[Function("ProductChangeTrigger")]
public async Task Run(
[SqlTrigger("[dbo].[Products]", "SqlConnectionString")]
IReadOnlyList<SqlChange<Product>> changes)
{
var tenantId = Environment.GetEnvironmentVariable("CERTYO_TENANT_ID")!;
foreach (var change in changes)
{
var operationType = change.Operation switch
{
SqlChangeOperation.Insert => "insert",
SqlChangeOperation.Update => "update",
SqlChangeOperation.Delete => "delete",
_ => "upsert"
};
var record = new
{
tenantId,
database = "ProductCatalog",
collection = "Products",
recordId = change.Item.Sku,
recordVersion = change.Item.ProductId.ToString(),
operationType,
recordPayload = new
{
productId = change.Item.ProductId,
sku = change.Item.Sku,
name = change.Item.Name,
category = change.Item.Category,
price = change.Item.Price,
},
sourceTimestamp = DateTime.UtcNow.ToString("O"),
idempotencyKey = $"sqltrigger-{change.Item.Sku}-"
+ $"{DateTime.UtcNow:yyyy-MM-dd-HH}",
};
var response = await _httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/records", record);
if (response.IsSuccessStatusCode)
{
_logger.LogInformation("Ingested {Sku}", change.Item.Sku);
}
else
{
_logger.LogWarning("Failed to ingest {Sku}: {Status}",
change.Item.Sku, response.StatusCode);
}
}
}
}
public record Product
{
public long ProductId { get; init; }
public string Sku { get; init; } = "";
public string? Name { get; init; }
public string? Category { get; init; }
public decimal? Price { get; init; }
}"""
CDC polling worker for SQL Server using pyodbc + requests.
Install: pip install pyodbc requests
Usage:
CERTYO_API_KEY=xxx CERTYO_TENANT_ID=yyy python cdc_worker.py
"""
import os
import time
import json
import pyodbc
import requests
from datetime import datetime, timezone
CERTYO_API_KEY = os.environ["CERTYO_API_KEY"]
CERTYO_TENANT_ID = os.environ["CERTYO_TENANT_ID"]
CERTYO_BASE_URL = "https://www.certyos.com"
SQL_CONNECTION_STRING = os.environ.get(
"SQL_CONNECTION_STRING",
"DRIVER={ODBC Driver 18 for SQL Server};"
"SERVER=localhost,1433;"
"DATABASE=ProductCatalog;"
"Trusted_Connection=yes;"
"TrustServerCertificate=yes;"
)
POLL_INTERVAL_SECONDS = int(os.environ.get("CDC_POLL_INTERVAL", "10"))
BULK_THRESHOLD = 10
def get_connection():
return pyodbc.connect(SQL_CONNECTION_STRING)
def get_max_lsn(cursor):
cursor.execute("SELECT sys.fn_cdc_get_max_lsn()")
row = cursor.fetchone()
return row[0] if row and row[0] else None
def get_min_lsn(cursor, capture_instance="dbo_Products"):
cursor.execute(f"SELECT sys.fn_cdc_get_min_lsn('{capture_instance}')")
row = cursor.fetchone()
return row[0] if row and row[0] else None
def increment_lsn(cursor, lsn):
cursor.execute("SELECT sys.fn_cdc_increment_lsn(?)", lsn)
row = cursor.fetchone()
return row[0] if row and row[0] else None
def load_last_lsn(cursor):
cursor.execute("""
IF OBJECT_ID('dbo.CdcTrackerPy', 'U') IS NULL
BEGIN
CREATE TABLE dbo.CdcTrackerPy (
Id INT PRIMARY KEY DEFAULT 1,
LastLsn VARBINARY(10) NOT NULL,
UpdatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
END
SELECT LastLsn FROM dbo.CdcTrackerPy WHERE Id = 1
""")
row = cursor.fetchone()
return row[0] if row else None
def save_last_lsn(cursor, lsn):
cursor.execute("""
MERGE dbo.CdcTrackerPy AS target
USING (SELECT 1 AS Id) AS source ON target.Id = source.Id
WHEN MATCHED THEN
UPDATE SET LastLsn = ?, UpdatedAt = SYSUTCDATETIME()
WHEN NOT MATCHED THEN
INSERT (Id, LastLsn, UpdatedAt)
VALUES (1, ?, SYSUTCDATETIME());
""", lsn, lsn)
cursor.commit()
def map_to_record(row):
op_map = {1: "delete", 2: "insert", 4: "update", 5: "upsert"}
return {
"tenantId": CERTYO_TENANT_ID,
"database": "ProductCatalog",
"collection": "Products",
"recordId": row["Sku"],
"recordVersion": str(row["ProductId"]),
"operationType": op_map.get(row["__$operation"], "upsert"),
"recordPayload": {
"productId": row["ProductId"],
"sku": row["Sku"],
"name": row.get("Name"),
"category": row.get("Category"),
"price": float(row["Price"]) if row.get("Price") else None,
"batchNumber": row.get("BatchNumber"),
},
"sourceTimestamp": datetime.now(timezone.utc).isoformat(),
"idempotencyKey": (
f"cdc-py-{row['Sku']}-{row['ProductId']}-"
f"{datetime.now(timezone.utc).strftime('%Y-%m-%d')}"
),
}
def ingest_records(records):
headers = {
"X-API-Key": CERTYO_API_KEY,
"Content-Type": "application/json",
}
if len(records) > BULK_THRESHOLD:
payload = {"tenantId": CERTYO_TENANT_ID, "records": records}
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records/bulk",
headers=headers, json=payload,
)
if resp.ok:
print(f"Bulk ingested {len(records)} records")
else:
print(f"Bulk ingestion failed: {resp.status_code} {resp.text}")
else:
for record in records:
resp = requests.post(
f"{CERTYO_BASE_URL}/api/v1/records",
headers=headers, json=record,
)
if resp.ok:
result = resp.json()
print(f" Ingested {record['recordId']}: {result['recordHash']}")
else:
print(f" Failed {record['recordId']}: {resp.status_code}")
def poll_cycle():
conn = get_connection()
cursor = conn.cursor()
max_lsn = get_max_lsn(cursor)
if max_lsn is None:
return
last_lsn = load_last_lsn(cursor)
if last_lsn and max_lsn <= last_lsn:
return
from_lsn = increment_lsn(cursor, last_lsn) if last_lsn else get_min_lsn(cursor)
if from_lsn is None or from_lsn > max_lsn:
return
cursor.execute("""
SELECT *
FROM cdc.fn_cdc_get_net_changes_dbo_Products(?, ?, 'all with merge')
ORDER BY __$start_lsn
""", from_lsn, max_lsn)
columns = [desc[0] for desc in cursor.description]
changes = [dict(zip(columns, row)) for row in cursor.fetchall()]
if not changes:
save_last_lsn(cursor, max_lsn)
cursor.close()
conn.close()
return
print(f"Found {len(changes)} CDC changes")
records = [map_to_record(row) for row in changes]
ingest_records(records)
save_last_lsn(cursor, max_lsn)
cursor.close()
conn.close()
if __name__ == "__main__":
print(f"CDC worker started (poll every {POLL_INTERVAL_SECONDS}s)")
while True:
try:
poll_cycle()
except Exception as e:
print(f"Poll cycle error: {e}")
time.sleep(POLL_INTERVAL_SECONDS)Worker Service Registrierung
Register the worker in your Program.cs for the .NET 8 project:
using Certyo.CdcWorker;
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHttpClient("Certyo", client =>
{
client.Timeout = TimeSpan.FromSeconds(30);
});
builder.Services.AddHostedService<CdcPollingWorker>();
var host = builder.Build();
host.Run();{
"ConnectionStrings": {
"SqlServer": "Server=localhost,1433;Database=ProductCatalog;Trusted_Connection=True;TrustServerCertificate=True;"
},
"Certyo": {
"TenantId": "your-tenant-id",
"ApiKey": "your-api-key"
},
"CdcPolling": {
"IntervalSeconds": 10
}
}CDC-Polling-Strategie
The worker uses a high-water mark pattern with Log Sequence Numbers (LSNs):
- On each poll cycle, call
sys.fn_cdc_get_max_lsn()to get the latest available LSN - Compare it to the last processed LSN (stored in the
CdcTrackertable). If they are equal, there are no new changes - Use
sys.fn_cdc_increment_lsn()to get the starting point for the next read — this avoids reprocessing the last batch - Read net changes with
fn_cdc_get_net_changes(collapses multiple operations per primary key into one row) using the'all with merge'filter - After successful ingestion, persist the new high-water mark LSN
Umgang mit Luecken und Neustarts
If the worker crashes, it resumes from the last persisted LSN. CDC retains change data for a configurable period (default: 3 days). If the worker is down longer than the retention window, the minimum LSN will have advanced past your saved position. Detect this by comparing your saved LSN against sys.fn_cdc_get_min_lsn() — if the min LSN is greater, you need a full table resync.
Massenoptimierung
For tables with high write throughput, sending individual records is inefficient. The worker above automatically switches to the bulk endpoint when a poll cycle returns more than 10 changes:
POST /api/v1/records/bulkaccepts up to 1000 records per request- The
.Chunk(1000)call in the C# worker splits larger batches into 1000-record pages - Bulk responses include per-record status — check for partial failures and retry only the failed records
- For tables producing thousands of changes per poll cycle, consider reducing the poll interval to flush smaller batches more frequently
Periodische Verifikation
Run a scheduled verification job (e.g., daily) that samples a percentage of anchored records and re-verifies them against the blockchain. This provides ongoing assurance that records have not been tampered with.
public async Task VerifySampleAsync(
HttpClient httpClient, string tenantId, string apiKey,
int sampleSize = 50, CancellationToken ct = default)
{
httpClient.DefaultRequestHeaders.Add("X-API-Key", apiKey);
// Query a random sample of recently anchored records
var queryUrl = $"https://www.certyos.com/api/v1/records" +
$"?tenantId={tenantId}&status=Anchored&limit={sampleSize}";
var queryResponse = await httpClient.GetFromJsonAsync<RecordQueryResult>(queryUrl, ct);
var verified = 0;
var failed = 0;
foreach (var record in queryResponse?.Items ?? [])
{
var verifyPayload = new
{
tenantId,
database = record.Database,
collection = record.Collection,
recordId = record.RecordId,
payload = record.RecordPayload,
};
var response = await httpClient.PostAsJsonAsync(
"https://www.certyos.com/api/v1/verify/record", verifyPayload, ct);
var result = await response.Content.ReadFromJsonAsync<VerifyResult>(
cancellationToken: ct);
if (result?.Verified == true && result.AnchoredOnChain)
verified++;
else
failed++;
}
Console.WriteLine($"Verification complete: {verified} passed, {failed} failed " +
$"out of {queryResponse?.Items?.Count ?? 0} sampled");
}Authentifizierungsreferenz
SQL Server Verbindung
The worker connects to SQL Server using a standard ADO.NET connection string. Both Windows Authentication (Trusted_Connection=True) and SQL Authentication (User Id=...;Password=...) are supported. For Azure SQL, use Azure AD authentication with managed identities where possible.
Certyo API-Schluessel
Pass your Certyo API key in the X-API-Key header on all Certyo API calls. The .NET worker reads it from configuration — in production, bind this to Azure Key Vault or a secrets manager rather than appsettings.json.
Fehlerbehebung
- CDC tables are empty — Verify SQL Server Agent is running (on-premises). For Azure SQL, CDC capture runs automatically but may have a short delay.
- "Invalid object name cdc.fn_cdc_get_net_changes" — Ensure CDC is enabled on both the database and the specific table. The function name includes the capture instance (e.g.,
cdc.fn_cdc_get_net_changes_dbo_Products). - LSN gap detected — Your worker was offline longer than the CDC retention period. Perform a full table snapshot to backfill, then resume CDC polling.
- Duplicate records in Certyo — The
idempotencyKeyincludes a date component. If you restart the worker on the same day, duplicates are deduplicated automatically. Across days, ensure your LSN tracking is persisted correctly.
AI Integration Skill
Download a skill file that enables AI agents to generate working SQL Server + Certyo integration code for any language or framework.
What's inside
- Authentifizierung — SQL Server Verbindungsstrings und CDC-Berechtigungseinrichtung
- Architektur — CDC-Erfassungstabellen → .NET 8 BackgroundService → Certyo API
- Feldzuordnung — CDC-Operationscodes und Tabellenspalten zum Certyo-Datensatzschema
- Codebeispiele — C# BackgroundService, Azure Function SQL-Trigger, Python mit pyodbc
- Verifikation — Periodische Stichprobenverifikation mit SQL UPDATE-Rueckschreibung
- CDC-Muster — LSN High-Water-Mark-Verfolgung, Massenoptimierung, Lueckenbehandlung
How to use
Claude Code
Place the file in your project's .claude/commands/ directory, then use it as a slash command:
# Download the skill file
mkdir -p .claude/commands
curl -o .claude/commands/certyo-sqlserver.md \
https://www.certyos.com/developers/skills/certyo-sqlserver-skill.md
# Use it in Claude Code
/certyo-sqlserver "Generieren Sie einen .NET Worker, der CDC verwendet, um Produktaenderungen an Certyo zu senden"Cursor / Copilot / Any AI Agent
Add the file to your project root or attach it to a conversation. The AI agent will use the SQL Server-specific patterns, field mappings, and code examples to generate correct integration code.
# Add to your project
curl -o CERTYO_SQLSERVER.md \
https://www.certyos.com/developers/skills/certyo-sqlserver-skill.md
# Then in your AI agent:
"Using the Certyo SQL Server spec in CERTYO_SQLSERVER.md,
generieren sie einen .net worker, der cdc verwendet, um produktaenderungen an certyo zu senden"CLAUDE.md Context File
Append the skill file to your project's CLAUDE.md so every Claude conversation has SQL Server + Certyo context automatically.
# Append to your project's CLAUDE.md
echo "" >> CLAUDE.md
echo "## Certyo SQL Server Integration" >> CLAUDE.md
cat CERTYO_SQLSERVER.md >> CLAUDE.md