diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs index 94938d4..c3ce734 100644 --- a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Dapper; @@ -188,4 +189,93 @@ string id } return new NotFoundResult(); } + + [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))] + public async Task GetJobConfigSchemaTracking( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route ="config/{area}/{id}/schema/tracking" + )] HttpRequest req, + string area, + string id + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (!string.IsNullOrWhiteSpace(area) && + !string.IsNullOrWhiteSpace(id) && + syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && + StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) + { + var utcNow = DateTimeOffset.UtcNow; + var syncJob = jobConfig.ToSyncJob( + null, + tokenCache: await tokenCacheService.GetTokenCache(jobConfig), + timestamp: utcNow, + expires: utcNow.AddMinutes(4), + id: id, + schedule: nameof(jobConfig.Manual), + seed: false + ); + + await using SqlConnection + sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, + targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; + + using IDisposable + from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), + to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); + + await sourceConn.OpenAsync(); + await targetConn.OpenAsync(); + + targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger); + + var rows = new List(); + foreach (var table in syncJob.Tables ?? []) + { + var columns = sourceConn.GetColumns(table.Source); + var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns); + var targetVersion = targetConn.GetTargetVersion(table.Target); + long? fromVersion = targetVersion.CurrentVersion < 0 ? null : targetVersion.CurrentVersion; + + if (sourceVersion == null) + { + rows.Add( + new SchemaTrackingTableRow( + table.Source, + 0, + 0, + 0, + -1, + targetVersion.CurrentVersion, + table.Target + ) + ); + continue; + } + + var counts = await sourceConn.QueryFirstAsync( + SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source), + new { FromVersion = fromVersion } + ); + + rows.Add( + new SchemaTrackingTableRow( + table.Source, + counts.Updated, + counts.Inserted, + counts.Deleted, + sourceVersion.CurrentVersion, + targetVersion.CurrentVersion, + table.Target + ) + ); + } + + return new OkObjectResult(rows); + } + return new NotFoundResult(); + } } diff --git a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs index ee818c7..1639490 100644 --- a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs +++ b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs @@ -47,6 +47,20 @@ FROM CHANGETABLE(VERSION {0}, ({1}), ({1})) as t ); } + /// + /// Builds a query that aggregates CHANGETABLE rows by SYS_CHANGE_OPERATION, + /// producing Updated, Inserted, and Deleted counts (one row per table). + /// Pass @FromVersion as NULL when no row exists in sync.TableVersion (never synced). + /// + /// Fully qualified source table name (e.g. [dbo].[MyTable]). + public static string GetChangeTrackingOperationCountsSelectStatement(string sourceTableName) + => $""" + SELECT ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'U' THEN 1 ELSE 0 END), 0) AS Updated, + ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'I' THEN 1 ELSE 0 END), 0) AS Inserted, + ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'D' THEN 1 ELSE 0 END), 0) AS Deleted + FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct + """; + public static string GetDropStatement(this string tableName) => $""" IF OBJECT_ID('{tableName}') IS NOT NULL diff --git a/src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs b/src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs new file mode 100644 index 0000000..dda8c46 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/ChangeOperationCounts.cs @@ -0,0 +1,16 @@ +namespace SqlBulkSyncFunction.Models.Schema; + +/// +/// Maps a single row returned by SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement for Dapper materialization. +/// +public sealed class ChangeOperationCounts +{ + /// Count of SYS_CHANGE_OPERATION = N'U' rows. + public long Updated { get; set; } + + /// Count of SYS_CHANGE_OPERATION = N'I' rows. + public long Inserted { get; set; } + + /// Count of SYS_CHANGE_OPERATION = N'D' rows. + public long Deleted { get; set; } +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs new file mode 100644 index 0000000..50a5733 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs @@ -0,0 +1,22 @@ +namespace SqlBulkSyncFunction.Models.Schema; + +/// +/// Per-table change tracking summary: counts of rows per SYS_CHANGE_OPERATION from +/// CHANGETABLE(CHANGES …, last_sync_version), plus current source version and last synced target version. +/// +/// Qualified source table name. +/// Number of changes with SYS_CHANGE_OPERATION = N'U'. +/// Number of changes with SYS_CHANGE_OPERATION = N'I'. +/// Number of changes with SYS_CHANGE_OPERATION = N'D'. +/// Current change tracking version on the source (CHANGE_TRACKING_CURRENT_VERSION() for database-wide tracking). +/// Last version stored in sync.TableVersion for the target table (or -1 if never synced). +/// Qualified target table name. +public record SchemaTrackingTableRow( + string SourceTableName, + long Updated, + long Inserted, + long Deleted, + long SourceVersion, + long TargetVersion, + string TargetTableName +);