From 227570287ea0cef46c27c933ae7ddb7cfb35b237 Mon Sep 17 00:00:00 2001 From: Mattias Karlsson Date: Tue, 24 Mar 2026 08:36:54 +0100 Subject: [PATCH] feat(config): add GET schema/tracking for change tracking counts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expose config/{area}/{id}/schema/tracking, returning per configured table the counts of updates, inserts, and deletes from CHANGETABLE(CHANGES …) since the last synced version in sync.TableVersion (NULL when never synced), plus current source and stored target versions. Add GetChangeTrackingOperationCountsSelectStatement in SqlStatementExtensions, and SchemaTrackingTableRow and ChangeOperationCounts models for the API and Dapper mapping. --- .../Functions/GetSyncJobConfig.cs | 90 +++++++++++++++++++ .../Helpers/SqlStatementExtensions.cs | 14 +++ .../Models/Schema/ChangeOperationCounts.cs | 16 ++++ .../Models/Schema/SchemaTrackingTableRow.cs | 22 +++++ 4 files changed, 142 insertions(+) create mode 100644 src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs create mode 100644 src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs index 94938d4..c3ce734 100644 --- a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Dapper; @@ -188,4 +189,93 @@ string id } return new NotFoundResult(); } + + [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))] + public async Task GetJobConfigSchemaTracking( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route ="config/{area}/{id}/schema/tracking" + )] HttpRequest req, + string area, + string id + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (!string.IsNullOrWhiteSpace(area) && + !string.IsNullOrWhiteSpace(id) && + syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && + StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) + { + var utcNow = DateTimeOffset.UtcNow; + var syncJob = jobConfig.ToSyncJob( + null, + tokenCache: await tokenCacheService.GetTokenCache(jobConfig), + timestamp: utcNow, + expires: utcNow.AddMinutes(4), + id: id, + schedule: nameof(jobConfig.Manual), + seed: false + ); + + await using SqlConnection + sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, + targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; + + using IDisposable + from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), + to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); + + await sourceConn.OpenAsync(); + await targetConn.OpenAsync(); + + targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger); + + var rows = new List(); + foreach (var table in syncJob.Tables ?? []) + { + var columns = sourceConn.GetColumns(table.Source); + var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns); + var targetVersion = targetConn.GetTargetVersion(table.Target); + long? fromVersion = targetVersion.CurrentVersion < 0 ? null : targetVersion.CurrentVersion; + + if (sourceVersion == null) + { + rows.Add( + new SchemaTrackingTableRow( + table.Source, + 0, + 0, + 0, + -1, + targetVersion.CurrentVersion, + table.Target + ) + ); + continue; + } + + var counts = await sourceConn.QueryFirstAsync( + SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source), + new { FromVersion = fromVersion } + ); + + rows.Add( + new SchemaTrackingTableRow( + table.Source, + counts.Updated, + counts.Inserted, + counts.Deleted, + sourceVersion.CurrentVersion, + targetVersion.CurrentVersion, + table.Target + ) + ); + } + + return new OkObjectResult(rows); + } + return new NotFoundResult(); + } } diff --git a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs index ee818c7..1639490 100644 --- a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs +++ b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs @@ -47,6 +47,20 @@ FROM CHANGETABLE(VERSION {0}, ({1}), ({1})) as t ); } + /// + /// Builds a query that aggregates CHANGETABLE rows by SYS_CHANGE_OPERATION, + /// producing Updated, Inserted, and Deleted counts (one row per table). + /// Pass @FromVersion as NULL when no row exists in sync.TableVersion (never synced). + /// + /// Fully qualified source table name (e.g. [dbo].[MyTable]). + public static string GetChangeTrackingOperationCountsSelectStatement(string sourceTableName) + => $""" + SELECT ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'U' THEN 1 ELSE 0 END), 0) AS Updated, + ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'I' THEN 1 ELSE 0 END), 0) AS Inserted, + ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'D' THEN 1 ELSE 0 END), 0) AS Deleted + FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct + """; + public static string GetDropStatement(this string tableName) => $""" IF OBJECT_ID('{tableName}') IS NOT NULL diff --git a/src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs b/src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs new file mode 100644 index 0000000..dda8c46 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs @@ -0,0 +1,16 @@ +namespace SqlBulkSyncFunction.Models.Schema; + +/// +/// Maps a single row returned by SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement for Dapper materialization. +/// +public sealed class ChangeOperationCounts +{ + /// Count of SYS_CHANGE_OPERATION = N'U' rows. + public long Updated { get; set; } + + /// Count of SYS_CHANGE_OPERATION = N'I' rows. + public long Inserted { get; set; } + + /// Count of SYS_CHANGE_OPERATION = N'D' rows. + public long Deleted { get; set; } +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs new file mode 100644 index 0000000..50a5733 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs @@ -0,0 +1,22 @@ +namespace SqlBulkSyncFunction.Models.Schema; + +/// +/// Per-table change tracking summary: counts of rows per SYS_CHANGE_OPERATION from +/// CHANGETABLE(CHANGES …, last_sync_version), plus current source version and last synced target version. +/// +/// Qualified source table name. +/// Number of changes with SYS_CHANGE_OPERATION = N'U'. +/// Number of changes with SYS_CHANGE_OPERATION = N'I'. +/// Number of changes with SYS_CHANGE_OPERATION = N'D'. +/// Current change tracking version on the source (CHANGE_TRACKING_CURRENT_VERSION() for database-wide tracking). +/// Last version stored in sync.TableVersion for the target table (or -1 if never synced). +/// Qualified target table name. +public record SchemaTrackingTableRow( + string SourceTableName, + long Updated, + long Inserted, + long Deleted, + long SourceVersion, + long TargetVersion, + string TargetTableName +);