Skip to content

Commit bc45fc2

Browse files
authored
feat(config): add GET schema/tracking for change tracking counts (#122)
Expose config/{area}/{id}/schema/tracking, returning per configured table the counts of updates, inserts, and deletes from CHANGETABLE(CHANGES …) since the last synced version in sync.TableVersion (NULL when never synced), plus current source and stored target versions. Add GetChangeTrackingOperationCountsSelectStatement in SqlStatementExtensions, and SchemaTrackingTableRow and ChangeOperationCounts models for the API and Dapper mapping.
1 parent 1c8e814 commit bc45fc2

4 files changed

Lines changed: 142 additions & 0 deletions

File tree

src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading.Tasks;
45
using Dapper;
@@ -188,4 +189,93 @@ string id
188189
}
189190
return new NotFoundResult();
190191
}
192+
193+
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))]
194+
public async Task<IActionResult> GetJobConfigSchemaTracking(
195+
[HttpTrigger(
196+
AuthorizationLevel.Function,
197+
"get",
198+
Route ="config/{area}/{id}/schema/tracking"
199+
)] HttpRequest req,
200+
string area,
201+
string id
202+
)
203+
{
204+
ArgumentNullException.ThrowIfNull(req);
205+
206+
if (!string.IsNullOrWhiteSpace(area) &&
207+
!string.IsNullOrWhiteSpace(id) &&
208+
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
209+
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
210+
{
211+
var utcNow = DateTimeOffset.UtcNow;
212+
var syncJob = jobConfig.ToSyncJob(
213+
null,
214+
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
215+
timestamp: utcNow,
216+
expires: utcNow.AddMinutes(4),
217+
id: id,
218+
schedule: nameof(jobConfig.Manual),
219+
seed: false
220+
);
221+
222+
await using SqlConnection
223+
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
224+
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };
225+
226+
using IDisposable
227+
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
228+
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);
229+
230+
await sourceConn.OpenAsync();
231+
await targetConn.OpenAsync();
232+
233+
targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger);
234+
235+
var rows = new List<SchemaTrackingTableRow>();
236+
foreach (var table in syncJob.Tables ?? [])
237+
{
238+
var columns = sourceConn.GetColumns(table.Source);
239+
var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
240+
var targetVersion = targetConn.GetTargetVersion(table.Target);
241+
long? fromVersion = targetVersion.CurrentVersion < 0 ? null : targetVersion.CurrentVersion;
242+
243+
if (sourceVersion == null)
244+
{
245+
rows.Add(
246+
new SchemaTrackingTableRow(
247+
table.Source,
248+
0,
249+
0,
250+
0,
251+
-1,
252+
targetVersion.CurrentVersion,
253+
table.Target
254+
)
255+
);
256+
continue;
257+
}
258+
259+
var counts = await sourceConn.QueryFirstAsync<ChangeOperationCounts>(
260+
SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source),
261+
new { FromVersion = fromVersion }
262+
);
263+
264+
rows.Add(
265+
new SchemaTrackingTableRow(
266+
table.Source,
267+
counts.Updated,
268+
counts.Inserted,
269+
counts.Deleted,
270+
sourceVersion.CurrentVersion,
271+
targetVersion.CurrentVersion,
272+
table.Target
273+
)
274+
);
275+
}
276+
277+
return new OkObjectResult(rows);
278+
}
279+
return new NotFoundResult();
280+
}
191281
}

src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ FROM CHANGETABLE(VERSION {0}, ({1}), ({1})) as t
4747
);
4848
}
4949

50+
/// <summary>
51+
/// Builds a query that aggregates <c>CHANGETABLE</c> rows by <c>SYS_CHANGE_OPERATION</c>,
52+
/// producing <c>Updated</c>, <c>Inserted</c>, and <c>Deleted</c> counts (one row per table).
53+
/// Pass <c>@FromVersion</c> as <c>NULL</c> when no row exists in <c>sync.TableVersion</c> (never synced).
54+
/// </summary>
55+
/// <param name="sourceTableName">Fully qualified source table name (e.g. <c>[dbo].[MyTable]</c>).</param>
56+
public static string GetChangeTrackingOperationCountsSelectStatement(string sourceTableName)
57+
=> $"""
58+
SELECT ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'U' THEN 1 ELSE 0 END), 0) AS Updated,
59+
ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'I' THEN 1 ELSE 0 END), 0) AS Inserted,
60+
ISNULL(SUM(CASE WHEN ct.SYS_CHANGE_OPERATION = N'D' THEN 1 ELSE 0 END), 0) AS Deleted
61+
FROM CHANGETABLE(CHANGES {sourceTableName}, @FromVersion) AS ct
62+
""";
63+
5064
public static string GetDropStatement(this string tableName)
5165
=> $"""
5266
IF OBJECT_ID('{tableName}') IS NOT NULL
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
namespace SqlBulkSyncFunction.Models.Schema;
2+
3+
/// <summary>
4+
/// Maps a single row returned by <c>SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement</c> for Dapper materialization.
5+
/// </summary>
6+
public sealed class ChangeOperationCounts
7+
{
8+
/// <summary>Count of <c>SYS_CHANGE_OPERATION = N'U'</c> rows.</summary>
9+
public long Updated { get; set; }
10+
11+
/// <summary>Count of <c>SYS_CHANGE_OPERATION = N'I'</c> rows.</summary>
12+
public long Inserted { get; set; }
13+
14+
/// <summary>Count of <c>SYS_CHANGE_OPERATION = N'D'</c> rows.</summary>
15+
public long Deleted { get; set; }
16+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
namespace SqlBulkSyncFunction.Models.Schema;
2+
3+
/// <summary>
4+
/// Per-table change tracking summary: counts of rows per <c>SYS_CHANGE_OPERATION</c> from
5+
/// <c>CHANGETABLE(CHANGES …, last_sync_version)</c>, plus current source version and last synced target version.
6+
/// </summary>
7+
/// <param name="SourceTableName">Qualified source table name.</param>
8+
/// <param name="Updated">Number of changes with <c>SYS_CHANGE_OPERATION = N'U'</c>.</param>
9+
/// <param name="Inserted">Number of changes with <c>SYS_CHANGE_OPERATION = N'I'</c>.</param>
10+
/// <param name="Deleted">Number of changes with <c>SYS_CHANGE_OPERATION = N'D'</c>.</param>
11+
/// <param name="SourceVersion">Current change tracking version on the source (<c>CHANGE_TRACKING_CURRENT_VERSION()</c> for database-wide tracking).</param>
12+
/// <param name="TargetVersion">Last version stored in <c>sync.TableVersion</c> for the target table (or <c>-1</c> if never synced).</param>
13+
/// <param name="TargetTableName">Qualified target table name.</param>
14+
public record SchemaTrackingTableRow(
15+
string SourceTableName,
16+
long Updated,
17+
long Inserted,
18+
long Deleted,
19+
long SourceVersion,
20+
long TargetVersion,
21+
string TargetTableName
22+
);

0 commit comments

Comments
 (0)