Skip to content

Commit 22a09f4

Browse files
authored
feat(config): split GetSyncJobConfig endpoints and add table-level tracking PK details (#123)
- Refactor `GetSyncJobConfig` into partial endpoint files (`ListAreas`, `ListIds`, `GetJobConfig`, `GetJobConfigSchema`, `GetJobConfigSchemaTracking`) to keep handlers focused and maintainable. - Add `GET config/{area}/{id}/schema/tracking/{tableId}` to return primary-key-only `CHANGETABLE(CHANGES ...)` details grouped as `updated`, `inserted`, and `deleted`. - Extend table metadata with configured table `Id` (`SyncJobTable`, `SchemaTrackingTableRow`) and add SQL helper/model support for PK-only tracking detail responses.
1 parent bc45fc2 commit 22a09f4

12 files changed

Lines changed: 501 additions & 267 deletions
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.Linq;
3+
using Microsoft.AspNetCore.Http;
4+
using Microsoft.AspNetCore.Mvc;
5+
using Microsoft.Azure.Functions.Worker;
6+
using SqlBulkSyncFunction.Helpers;
7+
using SqlBulkSyncFunction.Models.Job;
8+
9+
namespace SqlBulkSyncFunction.Functions;
10+
11+
public partial class GetSyncJobConfig
12+
{
13+
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfig))]
14+
public IActionResult GetJobConfig(
15+
[HttpTrigger(
16+
AuthorizationLevel.Function,
17+
"get",
18+
Route = "config/{area}/{id}"
19+
)] HttpRequest req,
20+
string area,
21+
string id
22+
)
23+
{
24+
ArgumentNullException.ThrowIfNull(req);
25+
26+
if (!string.IsNullOrWhiteSpace(area) &&
27+
!string.IsNullOrWhiteSpace(id) &&
28+
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
29+
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
30+
{
31+
var tables = jobConfig.Tables?.ToDictionary(
32+
key => key.Key,
33+
value => new SyncJobConfigTableDto(
34+
Source: value.Value,
35+
Target: jobConfig.TargetTables?.TryGetValue(value.Key, out var target) == true && !string.IsNullOrWhiteSpace(target) ? target : value.Value,
36+
DisableTargetIdentityInsert: jobConfig.DisableTargetIdentityInsertTables.GetValueOrDefault(value.Key),
37+
DisableConstraintCheck: jobConfig.DisableConstraintCheckTables.GetValueOrDefault(value.Key)
38+
)
39+
) ?? [];
40+
return new OkObjectResult(
41+
new SyncJobConfigResponse(
42+
Id: id,
43+
Area: area,
44+
BatchSize: jobConfig.BatchSize,
45+
Manual: jobConfig.Manual,
46+
Schedules: jobConfig.Schedules,
47+
Tables: tables
48+
)
49+
);
50+
}
51+
return new NotFoundResult();
52+
}
53+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Dapper;
5+
using Microsoft.AspNetCore.Http;
6+
using Microsoft.AspNetCore.Mvc;
7+
using Microsoft.Azure.Functions.Worker;
8+
using Microsoft.Data.SqlClient;
9+
using Microsoft.Extensions.Logging;
10+
using SqlBulkSyncFunction.Helpers;
11+
using SqlBulkSyncFunction.Models.Schema;
12+
13+
namespace SqlBulkSyncFunction.Functions;
14+
15+
public partial class GetSyncJobConfig
16+
{
17+
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchema))]
18+
public async Task<IActionResult> GetJobConfigSchema(
19+
[HttpTrigger(
20+
AuthorizationLevel.Function,
21+
"get",
22+
Route = "config/{area}/{id}/schema"
23+
)] HttpRequest req,
24+
string area,
25+
string id
26+
)
27+
{
28+
ArgumentNullException.ThrowIfNull(req);
29+
30+
if (!string.IsNullOrWhiteSpace(area) &&
31+
!string.IsNullOrWhiteSpace(id) &&
32+
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
33+
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
34+
{
35+
var utcNow = DateTimeOffset.UtcNow;
36+
var syncJob = jobConfig.ToSyncJob(
37+
null,
38+
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
39+
timestamp: utcNow,
40+
expires: utcNow.AddMinutes(4),
41+
id: id,
42+
schedule: nameof(jobConfig.Manual),
43+
seed: false
44+
);
45+
46+
await using SqlConnection
47+
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
48+
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };
49+
50+
using IDisposable
51+
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
52+
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);
53+
54+
await sourceConn.OpenAsync();
55+
await targetConn.OpenAsync();
56+
57+
DbInfo
58+
sourceDbInfo = await sourceConn.QueryFirstAsync<DbInfo>(SchemaExtensions.DbInfoQuery),
59+
targetDbInfo = await targetConn.QueryFirstAsync<DbInfo>(SchemaExtensions.DbInfoQuery);
60+
61+
SourceTableChangeTrackingInfo[]
62+
sourceTableChangeTrackingInfos = [.. await sourceConn.QueryAsync<SourceTableChangeTrackingInfo>(SchemaExtensions.SourceTableChangeTrackingInfoQuery)];
63+
64+
targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema", logger);
65+
66+
var tableSchemas = (
67+
syncJob.Tables ?? []
68+
)
69+
.Select(
70+
table => TableSchema.LoadSchema(
71+
sourceConn,
72+
targetConn,
73+
table,
74+
syncJob.BatchSize,
75+
globalChangeTracking: true
76+
)
77+
)
78+
.Select(table => new
79+
{
80+
table.SourceTableName,
81+
table.TargetTableName,
82+
table.SourceVersion,
83+
table.TargetVersion,
84+
table.Columns
85+
})
86+
.ToArray();
87+
88+
return new OkObjectResult(
89+
new
90+
{
91+
sourceDbInfo,
92+
targetDbInfo,
93+
tableSchemas,
94+
sourceTableChangeTrackingInfos
95+
}
96+
);
97+
}
98+
return new NotFoundResult();
99+
}
100+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using Dapper;
5+
using Microsoft.AspNetCore.Http;
6+
using Microsoft.AspNetCore.Mvc;
7+
using Microsoft.Azure.Functions.Worker;
8+
using Microsoft.Data.SqlClient;
9+
using Microsoft.Extensions.Logging;
10+
using SqlBulkSyncFunction.Helpers;
11+
using SqlBulkSyncFunction.Models.Schema;
12+
13+
namespace SqlBulkSyncFunction.Functions;
14+
15+
public partial class GetSyncJobConfig
16+
{
17+
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTracking))]
18+
public async Task<IActionResult> GetJobConfigSchemaTracking(
19+
[HttpTrigger(
20+
AuthorizationLevel.Function,
21+
"get",
22+
Route = "config/{area}/{id}/schema/tracking"
23+
)] HttpRequest req,
24+
string area,
25+
string id
26+
)
27+
{
28+
ArgumentNullException.ThrowIfNull(req);
29+
30+
if (!string.IsNullOrWhiteSpace(area) &&
31+
!string.IsNullOrWhiteSpace(id) &&
32+
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
33+
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
34+
{
35+
var utcNow = DateTimeOffset.UtcNow;
36+
var syncJob = jobConfig.ToSyncJob(
37+
null,
38+
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
39+
timestamp: utcNow,
40+
expires: utcNow.AddMinutes(4),
41+
id: id,
42+
schedule: nameof(jobConfig.Manual),
43+
seed: false
44+
);
45+
46+
await using SqlConnection
47+
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
48+
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };
49+
50+
using IDisposable
51+
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
52+
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);
53+
54+
await sourceConn.OpenAsync();
55+
await targetConn.OpenAsync();
56+
57+
targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking", logger);
58+
59+
var rows = new List<SchemaTrackingTableRow>();
60+
foreach (var table in syncJob.Tables ?? [])
61+
{
62+
var columns = sourceConn.GetColumns(table.Source);
63+
var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
64+
var targetVersion = targetConn.GetTargetVersion(table.Target);
65+
var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion;
66+
67+
if (sourceVersion == null)
68+
{
69+
rows.Add(
70+
new SchemaTrackingTableRow(
71+
table.Id,
72+
table.Source,
73+
0,
74+
0,
75+
0,
76+
-1,
77+
targetVersion.CurrentVersion,
78+
table.Target
79+
)
80+
);
81+
continue;
82+
}
83+
84+
var counts = await sourceConn.QueryFirstAsync<ChangeOperationCounts>(
85+
SqlStatementExtensions.GetChangeTrackingOperationCountsSelectStatement(table.Source),
86+
new { FromVersion = fromVersion }
87+
);
88+
89+
rows.Add(
90+
new SchemaTrackingTableRow(
91+
table.Id,
92+
table.Source,
93+
counts.Updated,
94+
counts.Inserted,
95+
counts.Deleted,
96+
sourceVersion.CurrentVersion,
97+
targetVersion.CurrentVersion,
98+
table.Target
99+
)
100+
);
101+
}
102+
103+
return new OkObjectResult(rows);
104+
}
105+
return new NotFoundResult();
106+
}
107+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Dapper;
6+
using Microsoft.AspNetCore.Http;
7+
using Microsoft.AspNetCore.Mvc;
8+
using Microsoft.Azure.Functions.Worker;
9+
using Microsoft.Data.SqlClient;
10+
using Microsoft.Extensions.Logging;
11+
using SqlBulkSyncFunction.Helpers;
12+
using SqlBulkSyncFunction.Models.Schema;
13+
14+
namespace SqlBulkSyncFunction.Functions;
15+
16+
public partial class GetSyncJobConfig
17+
{
18+
/// <summary>
19+
/// Returns change tracking primary key details for a specific configured table.
20+
/// </summary>
21+
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchemaTrackingTable))]
22+
public async Task<IActionResult> GetJobConfigSchemaTrackingTable(
23+
[HttpTrigger(
24+
AuthorizationLevel.Function,
25+
"get",
26+
Route = "config/{area}/{id}/schema/tracking/{tableId}"
27+
)] HttpRequest req,
28+
string area,
29+
string id,
30+
string tableId
31+
)
32+
{
33+
ArgumentNullException.ThrowIfNull(req);
34+
35+
if (!string.IsNullOrWhiteSpace(area) &&
36+
!string.IsNullOrWhiteSpace(id) &&
37+
!string.IsNullOrWhiteSpace(tableId) &&
38+
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) == true &&
39+
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
40+
{
41+
var utcNow = DateTimeOffset.UtcNow;
42+
var syncJob = jobConfig.ToSyncJob(
43+
null,
44+
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
45+
timestamp: utcNow,
46+
expires: utcNow.AddMinutes(4),
47+
id: id,
48+
schedule: nameof(jobConfig.Manual),
49+
seed: false
50+
);
51+
52+
var table = (syncJob.Tables ?? [])
53+
.FirstOrDefault(configuredTable => StringComparer.OrdinalIgnoreCase.Equals(configuredTable.Id, tableId));
54+
55+
if (table == null)
56+
{
57+
return new NotFoundResult();
58+
}
59+
60+
await using SqlConnection
61+
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
62+
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };
63+
64+
using IDisposable
65+
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
66+
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);
67+
68+
await sourceConn.OpenAsync();
69+
await targetConn.OpenAsync();
70+
71+
targetConn.EnsureSyncSchemaAndTableExists($"config/{id}/{area}/schema/tracking/{tableId}", logger);
72+
73+
var columns = sourceConn.GetColumns(table.Source);
74+
var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
75+
var targetVersion = targetConn.GetTargetVersion(table.Target);
76+
var fromVersion = targetVersion.CurrentVersion < 0 ? 0L : targetVersion.CurrentVersion;
77+
78+
if (sourceVersion == null)
79+
{
80+
return new OkObjectResult(
81+
new SchemaTrackingPrimaryKeyDetailsResponse([], [], [])
82+
);
83+
}
84+
85+
var changedRows = await sourceConn.QueryAsync(
86+
SqlStatementExtensions.GetChangeTrackingPrimaryKeyDetailsSelectStatement(table.Source, columns),
87+
new { FromVersion = fromVersion }
88+
);
89+
90+
var updated = new List<IReadOnlyDictionary<string, object>>();
91+
var inserted = new List<IReadOnlyDictionary<string, object>>();
92+
var deleted = new List<IReadOnlyDictionary<string, object>>();
93+
94+
foreach (var row in changedRows)
95+
{
96+
if (row is not IDictionary<string, object> values ||
97+
!values.TryGetValue("Operation", out var operationValue) ||
98+
operationValue is not string operation ||
99+
string.IsNullOrWhiteSpace(operation))
100+
{
101+
continue;
102+
}
103+
104+
var primaryKeyValues = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
105+
foreach (var value in values)
106+
{
107+
if (!string.Equals(value.Key, "Operation", StringComparison.OrdinalIgnoreCase))
108+
{
109+
primaryKeyValues[value.Key] = value.Value;
110+
}
111+
}
112+
113+
switch (operation)
114+
{
115+
case "U":
116+
updated.Add(primaryKeyValues);
117+
break;
118+
case "I":
119+
inserted.Add(primaryKeyValues);
120+
break;
121+
case "D":
122+
deleted.Add(primaryKeyValues);
123+
break;
124+
}
125+
}
126+
127+
return new OkObjectResult(
128+
new SchemaTrackingPrimaryKeyDetailsResponse(updated, inserted, deleted)
129+
);
130+
}
131+
132+
return new NotFoundResult();
133+
}
134+
}

0 commit comments

Comments
 (0)