Skip to content

Commit d6460e9

Browse files
committed
feat: add config API, Application Insights, and logging/style updates
- Add GetSyncJobConfig HTTP API with ListIds, ListAreas, GetJobConfig, and GetJobConfigSchema (config/{id}/{area}/schema) using ASP.NET Core HTTP and OpenApi - Introduce SyncJobConfigResponse and SyncJobConfigTableDto with JsonPropertyName for stable JSON casing - Add Application Insights: WorkerService, ApplicationInsights packages, ConfigureFunctionsWebApplication(), and telemetry registration - Address CA1873 with source-generated logging (LoggerMessage) in ProcessGlobalChangeTrackingQueue, ProcessGlobalChangeTrackingSchedule, and ProcessSyncJobService - Use primary constructors and camelCase parameters for functions and services; remove unused logger from QueueGlobalChangeTracking - Apply file-scoped namespaces, raw string literals for SQL, and collection expressions across helpers and models - Fix QueueGlobalChangeTracking early-return so NotFound is returned when job/area invalid and Accepted on success - Add [HttpResult] to QueueGlobalChangeTrackingResult.HttpResponseData - Update Directory.Packages.props and csproj with new package references
1 parent 4c3d237 commit d6460e9

28 files changed

Lines changed: 1518 additions & 1282 deletions

src/Directory.Packages.props

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,16 @@
66
<PackageVersion Include="AsyncEnumerator" Version="4.0.2" />
77
<PackageVersion Include="Azure.Identity" Version="1.17.1" />
88
<PackageVersion Include="Dapper" Version="2.1.66" />
9-
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.51.0" />
109
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
11-
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="6.8.0" />
10+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.1.0" />
11+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.OpenApi" Version="1.6.0" />
12+
<PackageVersion Include="Microsoft.ApplicationInsights.WorkerService" Version="2.23.0" />
13+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="2.50.0" />
1214
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Timer" Version="4.3.1" />
15+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="6.8.0" />
16+
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.51.0" />
1317
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.7" />
1418
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.1.4" />
1519
<PackageVersion Include="NuGetizer" Version="1.4.7" />
1620
</ItemGroup>
17-
</Project>
21+
</Project>
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
using System;
2+
using System.Linq;
3+
using System.Net;
4+
using System.Threading.Tasks;
5+
using Microsoft.AspNetCore.Http;
6+
using Microsoft.AspNetCore.Mvc;
7+
using Microsoft.Azure.Functions.Worker;
8+
using Microsoft.Data.SqlClient;
9+
using Microsoft.Extensions.Logging;
10+
using Microsoft.Extensions.Options;
11+
using SqlBulkSyncFunction.Helpers;
12+
using SqlBulkSyncFunction.Models;
13+
using SqlBulkSyncFunction.Models.Job;
14+
using SqlBulkSyncFunction.Models.Schema;
15+
using SqlBulkSyncFunction.Services;
16+
17+
namespace SqlBulkSyncFunction.Functions;
18+
19+
public partial class GetSyncJobConfig(
20+
ILogger<GetSyncJobConfig> logger,
21+
IOptions<SyncJobsConfig> syncJobsConfig,
22+
ITokenCacheService tokenCacheService
23+
)
24+
{
25+
26+
[Function(nameof(GetSyncJobConfig) + nameof(ListIds))]
27+
public async Task<IActionResult> ListIds(
28+
[HttpTrigger(
29+
AuthorizationLevel.Function,
30+
"get",
31+
Route ="config"
32+
)] HttpRequest reg
33+
)
34+
{
35+
var idS = syncJobsConfig.Value.Jobs.Keys;
36+
return new OkObjectResult(idS);
37+
}
38+
39+
[Function(nameof(GetSyncJobConfig) + nameof(ListAreas))]
40+
public async Task<IActionResult> ListAreas(
41+
[HttpTrigger(
42+
AuthorizationLevel.Function,
43+
"get",
44+
Route ="config/{id}"
45+
)] HttpRequest reg,
46+
string id
47+
)
48+
{
49+
if (
50+
!string.IsNullOrWhiteSpace(id) &&
51+
syncJobsConfig.Value.Jobs.TryGetValue(id, out var syncJobConfig)
52+
&& syncJobConfig?.Area is { Length: > 0 } area)
53+
{
54+
return new OkObjectResult(new string[] { area });
55+
}
56+
return new NotFoundResult();
57+
}
58+
59+
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfig))]
60+
public async Task<IActionResult> GetJobConfig(
61+
[HttpTrigger(
62+
AuthorizationLevel.Function,
63+
"get",
64+
Route ="config/{id}/{area}"
65+
)] HttpRequest reg,
66+
string id,
67+
string area
68+
)
69+
{
70+
if (!string.IsNullOrWhiteSpace(area) &&
71+
!string.IsNullOrWhiteSpace(id) &&
72+
syncJobsConfig.Value.Jobs.TryGetValue(id, out var jobConfig) &&
73+
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
74+
{
75+
var tables = jobConfig.Tables.ToDictionary(
76+
key => key.Key,
77+
value => new SyncJobConfigTableDto(
78+
Source: value.Value,
79+
Target: jobConfig.TargetTables.TryGetValue(value.Key, out var target) && !string.IsNullOrWhiteSpace(target) ? target : value.Value,
80+
DisableTargetIdentityInsert: jobConfig.DisableTargetIdentityInsertTables.TryGetValue(value.Key, out var disableTargetIdentityInsert) && disableTargetIdentityInsert
81+
)
82+
);
83+
return new OkObjectResult(
84+
new SyncJobConfigResponse(
85+
Id: id,
86+
Area: area,
87+
BatchSize: jobConfig.BatchSize,
88+
Manual: jobConfig.Manual,
89+
Schedules: jobConfig.Schedules,
90+
Tables: tables
91+
)
92+
);
93+
}
94+
return new NotFoundResult();
95+
}
96+
97+
[Function(nameof(GetSyncJobConfig) + nameof(GetJobConfigSchema))]
98+
public async Task<IActionResult> GetJobConfigSchema(
99+
[HttpTrigger(
100+
AuthorizationLevel.Function,
101+
"get",
102+
Route ="config/{id}/{area}/schema"
103+
)] HttpRequest reg,
104+
string id,
105+
string area
106+
)
107+
{
108+
if (!string.IsNullOrWhiteSpace(area) &&
109+
!string.IsNullOrWhiteSpace(id) &&
110+
syncJobsConfig.Value.Jobs.TryGetValue(id, out var jobConfig) &&
111+
StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig?.Area))
112+
{
113+
var syncJob = jobConfig.ToSyncJob(
114+
tokenCache: await tokenCacheService.GetTokenCache(jobConfig),
115+
expires: DateTimeOffset.UtcNow.AddMinutes(4),
116+
id: id,
117+
schedule: nameof(jobConfig.Manual),
118+
seed: false
119+
);
120+
121+
await using SqlConnection
122+
sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
123+
targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };
124+
125+
using IDisposable
126+
from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
127+
to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);
128+
129+
await sourceConn.OpenAsync();
130+
await targetConn.OpenAsync();
131+
132+
var tableSchemas = (
133+
syncJob.Tables ?? []
134+
)
135+
.Select(
136+
table => TableSchema.LoadSchema(
137+
sourceConn,
138+
targetConn,
139+
table,
140+
syncJob.BatchSize,
141+
globalChangeTracking: true
142+
)
143+
)
144+
.Select(table => new
145+
{
146+
table.SourceTableName,
147+
table.TargetTableName,
148+
table.SourceVersion,
149+
table.TargetVersion,
150+
table.Columns
151+
})
152+
.ToArray();
153+
154+
return new OkObjectResult(tableSchemas);
155+
}
156+
return new NotFoundResult();
157+
}
158+
}

src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingQueue.cs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,37 @@
55
using SqlBulkSyncFunction.Models.Job;
66
using SqlBulkSyncFunction.Services;
77

8-
namespace SqlBulkSyncFunction.Functions
8+
namespace SqlBulkSyncFunction.Functions;
9+
10+
public partial class ProcessGlobalChangeTrackingQueue(
11+
ILogger<ProcessGlobalChangeTrackingQueue> logger,
12+
IProcessSyncJobService processSyncJobService
13+
)
914
{
10-
// ReSharper disable once UnusedMember.Global
11-
public record ProcessGlobalChangeTrackingQueue(
12-
ILogger<ProcessGlobalChangeTrackingQueue> Logger,
13-
IProcessSyncJobService ProcessSyncJobService
14-
)
15-
{
15+
[Function(nameof(ProcessGlobalChangeTrackingQueue))]
1616

17-
[Function(nameof(ProcessGlobalChangeTrackingQueue))]
17+
public async Task Run([QueueTrigger(Constants.ProcessGlobalChangeTrackingQueue)] SyncJob syncJob)
18+
{
19+
if (syncJob == null)
20+
{
21+
return;
22+
}
1823

19-
public async Task Run([QueueTrigger(SqlBulkSyncFunction.Constants.ProcessGlobalChangeTrackingQueue)] SyncJob syncJob)
24+
using (logger.BeginScope("Schedule={Schedule}, Id={Id}, Area={Area}", syncJob.Schedule, syncJob.Id, syncJob.Area))
2025
{
21-
if (syncJob == null)
26+
if (syncJob.Expires < DateTimeOffset.UtcNow)
2227
{
28+
LogScheduleExpired(syncJob.Schedule, syncJob.Id, syncJob.Area, syncJob.Expires);
2329
return;
2430
}
2531

26-
var scope = new { syncJob.Schedule, syncJob.Id, syncJob.Area };
27-
using (Logger.BeginScope(scope))
28-
{
29-
if (syncJob.Expires < DateTimeOffset.UtcNow)
30-
{
31-
Logger.LogWarning("Sync job {Scope} Schedule expired: {Expires}", scope, syncJob.Expires);
32-
return;
33-
}
34-
35-
await ProcessSyncJobService.ProcessSyncJob(
36-
globalChangeTracking: true,
37-
syncJob: syncJob
38-
);
39-
}
32+
await processSyncJobService.ProcessSyncJob(
33+
globalChangeTracking: true,
34+
syncJob: syncJob
35+
);
4036
}
4137
}
38+
39+
[LoggerMessage(Level = LogLevel.Warning, Message = "Sync job Schedule={Schedule}, Id={Id}, Area={Area} expired: {Expires}")]
40+
private partial void LogScheduleExpired(string schedule, string id, string area, DateTimeOffset expires);
4241
}

0 commit comments

Comments
 (0)