Skip to content

Commit 01cf168

Browse files
authored
fix(sync): robust change-tracking version handling (#100)
- Skip table and log when SourceVersion is null (LogUnknownSourceVersion) - Treat "already up to date" by comparing CurrentVersion only - Use GetSourceSelectAllStatement when TargetVersion.CurrentVersion < 0 (was <= 1) - GetTableVersionStatement: derive MinValidVersion from sys.change_tracking_tables for global CT - Use collection expression in BulkCopyData; replace exceptions.Any() with exceptions.Count != 0
1 parent abd2ca5 commit 01cf168

3 files changed

Lines changed: 17 additions & 7 deletions

File tree

src/SqlBulkSyncFunction/Helpers/SqlCommandExtensions.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,7 @@ public static void BulkCopyData(
9292
object scope,
9393
ILogger logger
9494
) => Array.ForEach(
95-
new[]
96-
{
95+
[
9796
new
9897
{
9998
Name = tableSchema.SyncNewOrUpdatedTableName,
@@ -106,7 +105,7 @@ ILogger logger
106105
SelectStatement = tableSchema.SourceDeletedSelectStatement,
107106
Description = "deleted"
108107
}
109-
},
108+
],
110109
table =>
111110
{
112111
using var sourceCmd = new SqlCommand

src/SqlBulkSyncFunction/Helpers/SqlStatementExtensions.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ Column[] columns
1818
SELECT '{tableName}' AS TableName,
1919
CHANGE_TRACKING_CURRENT_VERSION() AS CurrentVersion,
2020
CHANGE_TRACKING_MIN_VALID_VERSION(
21-
OBJECT_ID('{tableName}')
21+
ctt.object_id
2222
) AS MinValidVersion,
2323
SYSDATETIMEOFFSET() AS Queried
24+
25+
FROM sys.change_tracking_tables AS ctt
26+
WHERE ctt.object_id = OBJECT_ID('{tableName}')
2427
""";
2528
}
2629

@@ -314,7 +317,7 @@ public static string GetNewOrUpdatedAtSourceSelectStatement(this TableSchema tab
314317
);
315318
}
316319

317-
var statement = (tableSchema.TargetVersion.CurrentVersion <= 1)
320+
var statement = (tableSchema.TargetVersion.CurrentVersion < 0)
318321
? tableSchema.GetSourceSelectAllStatement()
319322
: string.Format(
320323
"""

src/SqlBulkSyncFunction/Services/ProcessSyncJobService.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,12 @@ public async Task ProcessSyncJob(SyncJob syncJob, bool globalChangeTracking)
7373
{
7474
SeedTable(targetConn, tableSchema, sourceConn, scope);
7575
}
76-
else if (tableSchema.SourceVersion.Equals(tableSchema.TargetVersion))
76+
else if (tableSchema.SourceVersion == null)
77+
{
78+
LogUnknownSourceVersion(schedule, id, area, tableSchema.Scope);
79+
return;
80+
}
81+
else if (tableSchema.SourceVersion.CurrentVersion.Equals(tableSchema.TargetVersion.CurrentVersion))
7782
{
7883
LogAlreadyUpToDate(schedule, id, area);
7984
}
@@ -97,7 +102,7 @@ public async Task ProcessSyncJob(SyncJob syncJob, bool globalChangeTracking)
97102
}
98103
);
99104

100-
if (exceptions.Any())
105+
if (exceptions.Count != 0)
101106
{
102107
throw new AggregateException($"{scope} sync failed", exceptions);
103108
}
@@ -156,6 +161,9 @@ private void SyncTable(SqlConnection targetConn, TableSchema tableSchema, SqlCon
156161
[LoggerMessage(Level = LogLevel.Information, Message = "{Schedule} {Id} {Area} Already up to date")]
157162
private partial void LogAlreadyUpToDate(string schedule, string id, string area);
158163

164+
[LoggerMessage(Level = LogLevel.Warning, Message = "{Schedule} {Id} {Area} Unknown / failed to fetch source version {Scope}.")]
165+
private partial void LogUnknownSourceVersion(string schedule, string id, string area, string scope);
166+
159167
[LoggerMessage(Level = LogLevel.Information, Message = "{Schedule} {Id} {Area} End {TableSchemaScope}, duration {Elapsed}")]
160168
private partial void LogEndTableSchemaScopeDuration(string schedule, string id, string area, string tableSchemaScope, TimeSpan elapsed);
161169

0 commit comments

Comments
 (0)