diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfig.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfig.cs new file mode 100644 index 0000000..82d3f70 --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfig.cs @@ -0,0 +1,53 @@ +using System; +using System.Linq; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using SqlBulkSyncFunction.Helpers; +using SqlBulkSyncFunction.Models.Job; + +namespace SqlBulkSyncFunction.Functions; + +public partial class GetSyncJobConfig +{ + [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfig))] + public IActionResult GetJobConfig( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "config/{area}/{id}" + )] HttpRequest req, + string area, + string id + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (!string.IsNullOrWhiteSpace(area) && + !string.IsNullOrWhiteSpace(id) && + syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && + StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) + { + var tables = jobConfig.Tables?.ToDictionary( + key => key.Key, + value => new SyncJobConfigTableDto( + Source: value.Value, + Target: jobConfig.TargetTables?.TryGetValue(value.Key, out var target) == true && !string.IsNullOrWhiteSpace(target) ? target : value.Value, + DisableTargetIdentityInsert: jobConfig.DisableTargetIdentityInsertTables.GetValueOrDefault(value.Key), + DisableConstraintCheck: jobConfig.DisableConstraintCheckTables.GetValueOrDefault(value.Key) + ) + ) ?? []; + return new OkObjectResult( + new SyncJobConfigResponse( + Id: id, + Area: area, + BatchSize: jobConfig.BatchSize, + Manual: jobConfig.Manual, + Schedules: jobConfig.Schedules, + Tables: tables + ) + ); + } + return new NotFoundResult(); + } +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchema.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchema.cs new file mode 100644 index 0000000..ddc1edc --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchema.cs @@ -0,0 +1,100 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Dapper; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using SqlBulkSyncFunction.Helpers; +using SqlBulkSyncFunction.Models.Schema; + +namespace SqlBulkSyncFunction.Functions; + +public partial class GetSyncJobConfig +{ + [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchema))] + public async Task GetJobConfigSchema( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "config/{area}/{id}/schema" + )] HttpRequest req, + string area, + string id + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (!string.IsNullOrWhiteSpace(area) && + !string.IsNullOrWhiteSpace(id) && + syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && + StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) + { + var utcNow = DateTimeOffset.UtcNow; + var syncJob = jobConfig.ToSyncJob( + null, + tokenCache: await tokenCacheService.GetTokenCache(jobConfig), + timestamp: utcNow, + expires: utcNow.AddMinutes(4), + id: id, + schedule: nameof(jobConfig.Manual), + seed: false + ); + + await using SqlConnection + sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, + targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; + + using IDisposable + from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), + to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); + + await sourceConn.OpenAsync(); + await targetConn.OpenAsync(); + + DbInfo + sourceDbInfo = await sourceConn.QueryFirstAsync(SchemaExtensions.DbInfoQuery), + targetDbInfo = await targetConn.QueryFirstAsync(SchemaExtensions.DbInfoQuery); + + SourceTableChangeTrackingInfo[] + sourceTableChangeTrackingInfos = [.. await sourceConn.QueryAsync(SchemaExtensions.SourceTableChangeTrackingInfoQuery)]; + + targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema", logger); + + var tableSchemas = ( + syncJob.Tables ?? [] + ) + .Select( + table => TableSchema.LoadSchema( + sourceConn, + targetConn, + table, + syncJob.BatchSize, + globalChangeTracking: true + ) + ) + .Select(table => new + { + table.SourceTableName, + table.TargetTableName, + table.SourceVersion, + table.TargetVersion, + table.Columns + }) + .ToArray(); + + return new OkObjectResult( + new + { + sourceDbInfo, + targetDbInfo, + tableSchemas, + sourceTableChangeTrackingInfos + } + ); + } + return new NotFoundResult(); + } +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchemaTracking.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchemaTracking.cs new file mode 100644 index 0000000..df41842 --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchemaTracking.cs @@ -0,0 +1,107 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Dapper; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using SqlBulkSyncFunction.Helpers; +using SqlBulkSyncFunction.Models.Schema; + +namespace SqlBulkSyncFunction.Functions; + +public partial class GetSyncJobConfig +{ + [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))] + public async Task GetJobConfigSchemaTracking( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "config/{area}/{id}/schema/tracking" + )] HttpRequest req, + string area, + string id + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (!string.IsNullOrWhiteSpace(area) && + !string.IsNullOrWhiteSpace(id) && + syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && + StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) + { + var utcNow = DateTimeOffset.UtcNow; + var syncJob = jobConfig.ToSyncJob( + null, + tokenCache: await tokenCacheService.GetTokenCache(jobConfig), + timestamp: utcNow, + expires: utcNow.AddMinutes(4), + id: id, + schedule: nameof(jobConfig.Manual), + seed: false + ); + + await using SqlConnection + sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, + targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; + + using IDisposable + from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), + to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); + + await sourceConn.OpenAsync(); + await targetConn.OpenAsync(); + + targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger); + + var rows = new List(); + foreach (var table in syncJob.Tables ?? []) + { + var columns = sourceConn.GetColumns(table.Source); + var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns); + var targetVersion = targetConn.GetTargetVersion(table.Target); + var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion; + + if (sourceVersion == null) + { + rows.Add( + new SchemaTrackingTableRow( + table.Id, + table.Source, + 0, + 0, + 0, + -1, + targetVersion.CurrentVersion, + table.Target + ) + ); + continue; + } + + var counts = await sourceConn.QueryFirstAsync( + SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source), + new { FromVersion = fromVersion } + ); + + rows.Add( + new SchemaTrackingTableRow( + table.Id, + table.Source, + counts.Updated, + counts.Inserted, + counts.Deleted, + sourceVersion.CurrentVersion, + targetVersion.CurrentVersion, + table.Target + ) + ); + } + + return new OkObjectResult(rows); + } + return new NotFoundResult(); + } +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchemaTrackingTable.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchemaTrackingTable.cs new file mode 100644 index 0000000..8319a62 --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.GetJobConfigSchemaTrackingTable.cs @@ -0,0 +1,134 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Dapper; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using SqlBulkSyncFunction.Helpers; +using SqlBulkSyncFunction.Models.Schema; + +namespace SqlBulkSyncFunction.Functions; + +public partial class GetSyncJobConfig +{ + /// + /// Returns change tracking primary key details for a specific configured table. + /// + [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTrackingTable))] + public async Task GetJobConfigSchemaTrackingTable( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "config/{area}/{id}/schema/tracking/{tableId}" + )] HttpRequest req, + string area, + string id, + string tableId + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (!string.IsNullOrWhiteSpace(area) && + !string.IsNullOrWhiteSpace(id) && + !string.IsNullOrWhiteSpace(tableId) && + syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && + StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) + { + var utcNow = DateTimeOffset.UtcNow; + var syncJob = jobConfig.ToSyncJob( + null, + tokenCache: await tokenCacheService.GetTokenCache(jobConfig), + timestamp: utcNow, + expires: utcNow.AddMinutes(4), + id: id, + schedule: nameof(jobConfig.Manual), + seed: false + ); + + var table = (syncJob.Tables ?? []) + .FirstOrDefault(configuredTable => StringComparer.OrdinalIgnoreCase.Equals(configuredTable.Id, tableId)); + + if (table == null) + { + return new NotFoundResult(); + } + + await using SqlConnection + sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, + targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; + + using IDisposable + from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), + to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); + + await sourceConn.OpenAsync(); + await targetConn.OpenAsync(); + + targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking/{tableId}", logger); + + var columns = sourceConn.GetColumns(table.Source); + var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns); + var targetVersion = targetConn.GetTargetVersion(table.Target); + var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion; + + if (sourceVersion == null) + { + return new OkObjectResult( + new SchemaTrackingPrimaryKeyDetailsResponse([], [], []) + ); + } + + var changedRows = await sourceConn.QueryAsync( + SqlStatementExtensions.GetChangeTrackingPrimaryKeyDetailsSelectStatement(table.Source, columns), + new { FromVersion = fromVersion } + ); + + var updated = new List>(); + var inserted = new List>(); + var deleted = new List>(); + + foreach (var row in changedRows) + { + if (row is not IDictionary values || + !values.TryGetValue("Operation", out var operationValue) || + operationValue is not string operation || + string.IsNullOrWhiteSpace(operation)) + { + continue; + } + + var primaryKeyValues = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var value in values) + { + if (!string.Equals(value.Key, "Operation", StringComparison.OrdinalIgnoreCase)) + { + primaryKeyValues[value.Key] = value.Value; + } + } + + switch (operation) + { + case "U": + updated.Add(primaryKeyValues); + break; + case "I": + inserted.Add(primaryKeyValues); + break; + case "D": + deleted.Add(primaryKeyValues); + break; + } + } + + return new OkObjectResult( + new SchemaTrackingPrimaryKeyDetailsResponse(updated, inserted, deleted) + ); + } + + return new NotFoundResult(); + } +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.ListAreas.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.ListAreas.cs new file mode 100644 index 0000000..82c6261 --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.ListAreas.cs @@ -0,0 +1,30 @@ +using System; +using System.Linq; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; + +namespace SqlBulkSyncFunction.Functions; + +public partial class GetSyncJobConfig +{ + [Function(nameof(GetSyncJobConfig) + nameof(ListAreas))] + public IActionResult ListAreas( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "config" + )] HttpRequest req + ) + { + ArgumentNullException.ThrowIfNull(req); + + return syncJobsConfig?.Value?.Jobs?.Values + ?.Where(job => job.Area is { Length: > 0 }) + .Select(job => job.Area) + .Distinct() + .ToArray() is { Length: > 0 } areas + ? new OkObjectResult(areas) + : new NoContentResult(); + } +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.ListIds.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.ListIds.cs new file mode 100644 index 0000000..5045ade --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.ListIds.cs @@ -0,0 +1,32 @@ +using System; +using System.Linq; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; + +namespace SqlBulkSyncFunction.Functions; + +public partial class GetSyncJobConfig +{ + [Function(nameof(GetSyncJobConfig) + nameof(ListIds))] + public IActionResult ListIds( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "config/{area}" + )] HttpRequest req, + string area + ) + { + ArgumentNullException.ThrowIfNull(req); + + return + !string.IsNullOrWhiteSpace(area) && + syncJobsConfig?.Value?.Jobs + ?.Where(job => job.Value.Area == area) + .Select(job => job.Key) + .ToArray() is { Length: > 0 } ids + ? new OkObjectResult(ids) + : new NoContentResult(); + } +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs index c3ce734..56a89f4 100644 --- a/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs @@ -1,17 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Dapper; -using Microsoft.AspNetCore.Http; -using Microsoft.AspNetCore.Mvc; -using Microsoft.Azure.Functions.Worker; -using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using SqlBulkSyncFunction.Helpers; using SqlBulkSyncFunction.Models.Job; -using SqlBulkSyncFunction.Models.Schema; using SqlBulkSyncFunction.Services; namespace SqlBulkSyncFunction.Functions; @@ -22,260 +11,4 @@ public partial class GetSyncJobConfig( ITokenCacheService tokenCacheService ) { - - [Function(nameof(GetSyncJobConfig) + nameof(ListAreas))] - public IActionResult ListAreas( - [HttpTrigger( - AuthorizationLevel.Function, - "get", - Route ="config" - )] HttpRequest req - ) - { - ArgumentNullException.ThrowIfNull(req); - - return syncJobsConfig?.Value?.Jobs?.Values - ?.Where(job => job.Area is { Length: > 0 }) - .Select(job => job.Area) - .Distinct() - .ToArray() is { Length: >0 } areas - ? new OkObjectResult(areas) - : new NoContentResult(); - } - - [Function(nameof(GetSyncJobConfig) + nameof(ListIds))] - public IActionResult ListIds( - [HttpTrigger( - AuthorizationLevel.Function, - "get", - Route ="config/{area}" - )] HttpRequest req, - string area - ) - { - ArgumentNullException.ThrowIfNull(req); - - return - !string.IsNullOrWhiteSpace(area) && - syncJobsConfig?.Value?.Jobs - ?.Where(job => job.Value.Area == area) - .Select(job => job.Key) - .ToArray() is { Length: >0 } ids - ? new OkObjectResult(ids) - : new NoContentResult(); - } - - [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfig))] - public IActionResult GetJobConfig( - [HttpTrigger( - AuthorizationLevel.Function, - "get", - Route ="config/{area}/{id}" - )] HttpRequest req, - string area, - string id - ) - { - ArgumentNullException.ThrowIfNull(req); - - if (!string.IsNullOrWhiteSpace(area) && - !string.IsNullOrWhiteSpace(id) && - syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && - StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) - { - var tables = jobConfig.Tables?.ToDictionary( - key => key.Key, - value => new SyncJobConfigTableDto( - Source: value.Value, - Target: jobConfig.TargetTables?.TryGetValue(value.Key, out var target) == true && !string.IsNullOrWhiteSpace(target) ? target : value.Value, - DisableTargetIdentityInsert: jobConfig.DisableTargetIdentityInsertTables.GetValueOrDefault(value.Key), - DisableConstraintCheck: jobConfig.DisableConstraintCheckTables.GetValueOrDefault(value.Key) - ) - ) ?? []; - return new OkObjectResult( - new SyncJobConfigResponse( - Id: id, - Area: area, - BatchSize: jobConfig.BatchSize, - Manual: jobConfig.Manual, - Schedules: jobConfig.Schedules, - Tables: tables - ) - ); - } - return new NotFoundResult(); - } - - [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchema))] - public async Task GetJobConfigSchema( - [HttpTrigger( - AuthorizationLevel.Function, - "get", - Route ="config/{area}/{id}/schema" - )] HttpRequest req, - string area, - string id - ) - { - ArgumentNullException.ThrowIfNull(req); - - if (!string.IsNullOrWhiteSpace(area) && - !string.IsNullOrWhiteSpace(id) && - syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && - StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) - { - var utcNow = DateTimeOffset.UtcNow; - var syncJob = jobConfig.ToSyncJob( - null, - tokenCache: await tokenCacheService.GetTokenCache(jobConfig), - timestamp: utcNow, - expires: utcNow.AddMinutes(4), - id: id, - schedule: nameof(jobConfig.Manual), - seed: false - ); - - await using SqlConnection - sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, - targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; - - using IDisposable - from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), - to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); - - await sourceConn.OpenAsync(); - await targetConn.OpenAsync(); - - DbInfo - sourceDbInfo = await sourceConn.QueryFirstAsync(SchemaExtensions.DbInfoQuery), - targetDbInfo = await targetConn.QueryFirstAsync(SchemaExtensions.DbInfoQuery); - - SourceTableChangeTrackingInfo[] - sourceTableChangeTrackingInfos = [.. await sourceConn.QueryAsync(SchemaExtensions.SourceTableChangeTrackingInfoQuery)]; - - targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema", logger); - - var tableSchemas = ( - syncJob.Tables ?? [] - ) - .Select( - table => TableSchema.LoadSchema( - sourceConn, - targetConn, - table, - syncJob.BatchSize, - globalChangeTracking: true - ) - ) - .Select(table => new - { - table.SourceTableName, - table.TargetTableName, - table.SourceVersion, - table.TargetVersion, - table.Columns - }) - .ToArray(); - - return new OkObjectResult( - new - { - sourceDbInfo, - targetDbInfo, - tableSchemas, - sourceTableChangeTrackingInfos - } - ); - } - return new NotFoundResult(); - } - - [Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))] - public async Task GetJobConfigSchemaTracking( - [HttpTrigger( - AuthorizationLevel.Function, - "get", - Route ="config/{area}/{id}/schema/tracking" - )] HttpRequest req, - string area, - string id - ) - { - ArgumentNullException.ThrowIfNull(req); - - if (!string.IsNullOrWhiteSpace(area) && - !string.IsNullOrWhiteSpace(id) && - syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true && - StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area)) - { - var utcNow = DateTimeOffset.UtcNow; - var syncJob = jobConfig.ToSyncJob( - null, - tokenCache: await tokenCacheService.GetTokenCache(jobConfig), - timestamp: utcNow, - expires: utcNow.AddMinutes(4), - id: id, - schedule: nameof(jobConfig.Manual), - seed: false - ); - - await using SqlConnection - sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, - targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; - - using IDisposable - from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), - to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); - - await sourceConn.OpenAsync(); - await targetConn.OpenAsync(); - - targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger); - - var rows = new List(); - foreach (var table in syncJob.Tables ?? []) - { - var columns = sourceConn.GetColumns(table.Source); - var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns); - var targetVersion = targetConn.GetTargetVersion(table.Target); - long? fromVersion = targetVersion.CurrentVersion < 0 ? null : targetVersion.CurrentVersion; - - if (sourceVersion == null) - { - rows.Add( - new SchemaTrackingTableRow( - table.Source, - 0, - 0, - 0, - -1, - targetVersion.CurrentVersion, - table.Target - ) - ); - continue; - } - - var counts = await sourceConn.QueryFirstAsync( - SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source), - new { FromVersion = fromVersion } - ); - - rows.Add( - new SchemaTrackingTableRow( - table.Source, - counts.Updated, - counts.Inserted, - counts.Deleted, - sourceVersion.CurrentVersion, - targetVersion.CurrentVersion, - table.Target - ) - ); - } - - return new OkObjectResult(rows); - } - return new NotFoundResult(); - } } diff --git a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs index 1639490..ef7f8ce 100644 --- a/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs +++ b/src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs @@ -61,6 +61,31 @@ SELECT ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'U' THEN 1 ELSE 0 END), FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct """; + /// + /// Builds a query that returns only primary key values and operation code for each changed row from + /// CHANGETABLE(CHANGES ...). Result includes one row per change with operation in Operation. + /// + /// Fully qualified source table name (e.g. [dbo].[MyTable]). + /// Source table columns used to select primary key fields. + public static string GetChangeTrackingPrimaryKeyDetailsSelectStatement(string sourceTableName, Column[] columns) + { + var primaryColumns = columns + .Where(column => column.IsPrimary) + .Select(column => string.Concat("ct.", column.QuoteName, " AS ", column.QuoteName)) + .ToArray(); + + if (primaryColumns.Length == 0) + { + throw new Exception($"Missing primary key columns for table {sourceTableName}."); + } + + return $""" + SELECT ct.SYS_CHANGE_OPERATION AS Operation, + {string.Join(",\r\n ", primaryColumns)} + FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct + """; + } + public static string GetDropStatement(this string tableName) => $""" IF OBJECT_ID('{tableName}') IS NOT NULL diff --git a/src/SqlBulkSyncFunction/Helpers/SyncJobConfigExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SyncJobConfigExtensions.cs index 8289846..72a9db1 100644 --- a/src/SqlBulkSyncFunction/Helpers/SyncJobConfigExtensions.cs +++ b/src/SqlBulkSyncFunction/Helpers/SyncJobConfigExtensions.cs @@ -56,6 +56,7 @@ private static SyncJobTable[] ToSyncJobTables(this SyncJobConfig job) return [.. job.Tables.Select( sourceTable => new SyncJobTable( + sourceTable.Key, sourceTable.Value, targetTableLookup?[sourceTable.Key].FirstOrDefault() switch { diff --git a/src/SqlBulkSyncFunction/Models/Job/SyncJobTable.cs b/src/SqlBulkSyncFunction/Models/Job/SyncJobTable.cs index da84443..0404a45 100644 --- a/src/SqlBulkSyncFunction/Models/Job/SyncJobTable.cs +++ b/src/SqlBulkSyncFunction/Models/Job/SyncJobTable.cs @@ -3,11 +3,13 @@ namespace SqlBulkSyncFunction.Models.Job; /// /// Represents a table mapping for sync operations. /// +/// Identifier for the table mapping in configuration. /// Source table name. /// Target table name. /// Whether to disable identity insert on target table during sync. /// Whether to disable constraint checking on target table during merge operations. public record SyncJobTable( + string Id, string Source, string Target, bool DisableTargetIdentityInsert, diff --git a/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingPrimaryKeyDetailsResponse.cs b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingPrimaryKeyDetailsResponse.cs new file mode 100644 index 0000000..b92e98e --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingPrimaryKeyDetailsResponse.cs @@ -0,0 +1,15 @@ +using System.Collections.Generic; + +namespace SqlBulkSyncFunction.Models.Schema; + +/// +/// Represents primary key-only change tracking details for a single table, grouped by change operation. +/// +/// Changed rows where SYS_CHANGE_OPERATION = N'U'. +/// Changed rows where SYS_CHANGE_OPERATION = N'I'. +/// Changed rows where SYS_CHANGE_OPERATION = N'D'. +public record SchemaTrackingPrimaryKeyDetailsResponse( + IReadOnlyList> Updated, + IReadOnlyList> Inserted, + IReadOnlyList> Deleted +); diff --git a/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs index 50a5733..4beb13e 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/SchemaTrackingTableRow.cs @@ -4,6 +4,7 @@ namespace SqlBulkSyncFunction.Models.Schema; /// Per-table change tracking summary: counts of rows per SYS_CHANGE_OPERATION from /// CHANGETABLE(CHANGES …, last_sync_version), plus current source version and last synced target version. /// +/// Configured table identifier from the sync job definition. /// Qualified source table name. /// Number of changes with SYS_CHANGE_OPERATION = N'U'. /// Number of changes with SYS_CHANGE_OPERATION = N'I'. @@ -12,6 +13,7 @@ namespace SqlBulkSyncFunction.Models.Schema; /// Last version stored in sync.TableVersion for the target table (or -1 if never synced). /// Qualified target table name. public record SchemaTrackingTableRow( + string Id, string SourceTableName, long Updated, long Inserted,