Skip to content

Commit cadc930

Browse files
authored
feat: add asynchronous schema tracking export with ZIP segments and job status API (#140)
* feat: add asynchronous schema tracking export with ZIP segments and job status API Introduce end-to-end export of change-tracking data for a configured sync table: full rows for inserts/updates and primary keys for deletes, written as streamed JSON-in-ZIP to blob storage with Azure Table coordination and queue-based orchestration. HTTP (Function auth): - POST .../schema/tracking/{tableId}/export with author, referenceId, purpose; persists blobs and exportjobs row, returns 202 with Location to status. - GET .../export/status/{*correlationId} for job detail (optional result SAS when result.json exists) or list when correlation segment is empty. Workers and storage: - Queues: exportjob dispatch; exportjob-{updated,inserted,deleted} segments; matching -done completion queues and -error dead-letter style correlation messages on segment failure. - Blobs: export container (request.json, job.json, response/*.zip, result.json). - Tables: exportjobs partition area_jobId_tableId (sanitized), row key v7 GUID. Implementation: - SchemaTrackingExportService for create, list, status, dispatch, segment processing, optimistic ETag merges, finalize with 7-day read SAS URIs. - SchemaTrackingExportStreamingZip streams SqlDataReader into one ZIP entry. - SqlStatementExtensions: CHANGETABLE segment SELECTs; Guid.CreateVersion7 for new identifiers where applicable. Also: README schema tracking export section; Azure.Data.Tables dependency; Directory.Packages.props bumps (Functions worker, OpenTelemetry, SqlClient, Cronos, etc.). * Update .NET SDK to 10.0.203
1 parent 85cfbd0 commit cadc930

23 files changed

Lines changed: 1767 additions & 11 deletions

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,15 @@ The function is configured through Azure App Settings / Environment variables, y
6161
>
6262
> `Custom` schedule is default if no scheduled is specified in sync job configuration.
6363
64+
## Schema tracking export
65+
66+
Asynchronous export of change-tracking data (full rows for inserts/updates, primary keys for deletes) uses the same storage account as `AzureWebJobsStorage`: blob container `export`, table `exportjobs`.
67+
68+
HTTP routes (Function authorization level):
69+
70+
- `POST /api/config/{area}/{id}/schema/tracking/{tableId}/export` — JSON body `{ "author", "referenceId", "purpose" }`; returns `202 Accepted` with `Location` pointing at status.
71+
- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}` — same item shape as list; includes optional `result` when `response/result.json` exists.
72+
- `GET /api/config/{area}/{id}/schema/tracking/{tableId}/export/status` — list jobs for that table partition.
6473

6574
## Development resources
6675

global.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"sdk": {
3-
"version": "10.0.201",
3+
"version": "10.0.203",
44
"rollForward": "latestFeature",
55
"allowPrerelease": false
66
}

src/Directory.Packages.props

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@
99
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
1010
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.1.0" />
1111
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.OpenApi" Version="1.6.0" />
12-
<PackageVersion Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.7.0" />
13-
<PackageVersion Include="Microsoft.Azure.Functions.Worker.OpenTelemetry" Version="1.1.0" />
14-
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.15.2" />
15-
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.15.2" />
12+
<PackageVersion Include="Azure.Monitor.OpenTelemetry.Exporter" Version="1.8.0" />
13+
<PackageVersion Include="Microsoft.Azure.Functions.Worker.OpenTelemetry" Version="1.2.0" />
14+
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.15.3" />
15+
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.15.3" />
1616
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Timer" Version="4.3.1" />
1717
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage" Version="6.8.1" />
18-
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.51.0" />
18+
<PackageVersion Include="Microsoft.Azure.Functions.Worker" Version="2.52.0" />
1919
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.7" />
20-
<PackageVersion Include="Microsoft.Data.SqlClient" Version="7.0.0" />
20+
<PackageVersion Include="Microsoft.Data.SqlClient" Version="7.0.1" />
21+
<PackageVersion Include="Azure.Data.Tables" Version="12.11.0" />
2122
<PackageVersion Include="NuGetizer" Version="1.4.7" />
22-
<PackageVersion Include="Cronos" Version="0.12.0" />
23+
<PackageVersion Include="Cronos" Version="0.13.0" />
2324
</ItemGroup>
2425
</Project>

src/SqlBulkSyncFunction/Constants.cs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,58 @@ public static class Queues
2121
/// Sync progress queue name.
2222
/// </summary>
2323
public const string SyncJobProgressQueue = "syncjobprogress";
24+
25+
/// <summary>
26+
/// Main queue for schema tracking data export jobs (message body: full correlation id path).
27+
/// </summary>
28+
public const string ExportJob = "exportjob";
29+
30+
/// <summary>
31+
/// Queue for export workers that build the updated-rows ZIP from change tracking.
32+
/// </summary>
33+
public const string ExportJobUpdated = "exportjob-updated";
34+
35+
/// <summary>
36+
/// Queue for export workers that build the inserted-rows ZIP from change tracking.
37+
/// </summary>
38+
public const string ExportJobInserted = "exportjob-inserted";
39+
40+
/// <summary>
41+
/// Queue for export workers that build the deleted-rows ZIP from change tracking.
42+
/// </summary>
43+
public const string ExportJobDeleted = "exportjob-deleted";
44+
45+
/// <summary>
46+
/// Signaled when the updated ZIP has been written for an export job.
47+
/// </summary>
48+
public const string ExportJobUpdatedDone = ExportJobUpdated + "-done";
49+
50+
/// <summary>
51+
/// Signaled when the inserted ZIP has been written for an export job.
52+
/// </summary>
53+
public const string ExportJobInsertedDone = ExportJobInserted + "-done";
54+
55+
/// <summary>
56+
/// Signaled when the deleted ZIP has been written for an export job.
57+
/// </summary>
58+
public const string ExportJobDeletedDone = ExportJobDeleted + "-done";
59+
60+
/// <summary>
61+
/// Updated segment only: message body is the correlation id when <c>ProcessExportSegmentAsync</c> throws during processing (SQL/ZIP/blob). Invalid queue bodies are rejected at the trigger with <c>ArgumentException.ThrowIfNullOrEmpty</c>.
62+
/// </summary>
63+
public const string ExportJobUpdatedError = ExportJobUpdated + "-error";
64+
65+
/// <summary>
66+
/// Inserted segment only: correlation id when segment processing throws.
67+
/// </summary>
68+
public const string ExportJobInsertedError = ExportJobInserted + "-error";
69+
70+
/// <summary>
71+
/// Deleted segment only: correlation id when segment processing throws.
72+
/// </summary>
73+
public const string ExportJobDeletedError = ExportJobDeleted + "-error";
2474
}
75+
2576
/// <summary>
2677
/// NCRONTAB expressions and configuration keys for <see cref="Functions.ProcessGlobalChangeTrackingSchedule"/> timer triggers.
2778
/// </summary>
@@ -63,6 +114,22 @@ public static class Containers
63114
/// Blob container for per-job monitoring aggregates (written by the aggregation timer).
64115
/// </summary>
65116
public const string Monitor = "monitor";
117+
118+
/// <summary>
119+
/// Blob container for schema tracking export requests, jobs, response ZIPs, and result metadata.
120+
/// </summary>
121+
public const string Export = "export";
122+
}
123+
124+
/// <summary>
125+
/// Azure Table Storage table names used by the function app.
126+
/// </summary>
127+
public static class Tables
128+
{
129+
/// <summary>
130+
/// Table storing export job state (partition: area_id_tableId, row: export job guid).
131+
/// </summary>
132+
public const string ExportJobs = "exportjobs";
66133
}
67134

68135
/// <summary>
@@ -72,5 +139,8 @@ public static class BlobContentTypes
72139
{
73140
/// <summary>JSON documents (UTF-8).</summary>
74141
public const string Json = "application/json; charset=utf-8";
142+
143+
/// <summary>ZIP archives.</summary>
144+
public const string Zip = "application/zip";
75145
}
76146
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
using System;
2+
using System.Globalization;
3+
using System.Linq;
4+
using System.Text.Json;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Microsoft.AspNetCore.Http;
8+
using Microsoft.AspNetCore.Mvc;
9+
using Microsoft.Azure.Functions.Worker;
10+
using SqlBulkSyncFunction.Models.Schema.Export;
11+
12+
namespace SqlBulkSyncFunction.Functions;
13+
14+
#nullable enable
15+
16+
public partial class GetSyncJobConfig
17+
{
18+
private static readonly JsonSerializerOptions ExportJsonOptions = new()
19+
{
20+
PropertyNameCaseInsensitive = true,
21+
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
22+
};
23+
24+
/// <summary>
25+
/// Accepts a schema tracking data export request, persists metadata to blob and table storage, and enqueues processing.
26+
/// </summary>
27+
[Function(nameof(GetSyncJobConfig) + nameof(PostSchemaTrackingExport))]
28+
public async Task<IActionResult> PostSchemaTrackingExport(
29+
[HttpTrigger(
30+
AuthorizationLevel.Function,
31+
"post",
32+
Route = "config/{area}/{id}/schema/tracking/{tableId}/export"
33+
)]
34+
HttpRequest req,
35+
string area,
36+
string id,
37+
string tableId,
38+
CancellationToken cancellationToken
39+
)
40+
{
41+
ArgumentNullException.ThrowIfNull(req);
42+
43+
SchemaTrackingExportRequestBody? body;
44+
try
45+
{
46+
body = await JsonSerializer
47+
.DeserializeAsync<SchemaTrackingExportRequestBody>(req.Body, ExportJsonOptions, cancellationToken)
48+
.ConfigureAwait(false);
49+
}
50+
catch (JsonException)
51+
{
52+
return new BadRequestObjectResult("Invalid JSON body.");
53+
}
54+
55+
if (body == null)
56+
{
57+
return new BadRequestObjectResult("Body is required.");
58+
}
59+
60+
var result = await schemaTrackingExportService
61+
.TryCreateExportJobAsync(area, id, tableId, body, cancellationToken)
62+
.ConfigureAwait(false);
63+
64+
if (result.Code == ExportJobCreateResultCode.ValidationFailed)
65+
{
66+
return new BadRequestObjectResult("author, referenceId, and purpose must be non-empty strings.");
67+
}
68+
69+
if (result.Code == ExportJobCreateResultCode.NotFound || result.Job == null)
70+
{
71+
return new NotFoundResult();
72+
}
73+
74+
var location = BuildExportStatusLocation(req, area, id, tableId, result.Job.CorrelationId);
75+
return new AcceptedResult(location: location, value: result.Job);
76+
}
77+
78+
/// <summary>
79+
/// Returns detailed status for a single export job when <paramref name="correlationId"/> is present in the path,
80+
/// or lists export jobs for the table when the URL ends at <c>.../export/status</c> (catch-all binds empty).
81+
/// </summary>
82+
/// <remarks>
83+
/// A separate route without <c>{*correlationId}</c> would never win in the host: the catch-all matches the same URL with an empty remainder,
84+
/// and <see cref="SchemaTrackingExportService.TryGetExportStatusAsync"/> returns null for an empty id, producing 404. List behavior is therefore handled here.
85+
/// </remarks>
86+
[Function(nameof(GetSyncJobConfig) + nameof(GetSchemaTrackingExportStatus))]
87+
public async Task<IActionResult> GetSchemaTrackingExportStatus(
88+
[HttpTrigger(
89+
AuthorizationLevel.Function,
90+
"get",
91+
Route = "config/{area}/{id}/schema/tracking/{tableId}/export/status/{*correlationId}"
92+
)]
93+
HttpRequest req,
94+
string area,
95+
string id,
96+
string tableId,
97+
string correlationId,
98+
CancellationToken cancellationToken
99+
)
100+
{
101+
ArgumentNullException.ThrowIfNull(req);
102+
103+
if (string.IsNullOrWhiteSpace(correlationId))
104+
{
105+
if (string.IsNullOrWhiteSpace(area) ||
106+
string.IsNullOrWhiteSpace(id) ||
107+
string.IsNullOrWhiteSpace(tableId) ||
108+
syncJobsConfig?.Value?.Jobs?.TryGetValue(id, out var jobConfig) != true ||
109+
jobConfig == null ||
110+
!StringComparer.OrdinalIgnoreCase.Equals(area, jobConfig.Area) ||
111+
jobConfig.Tables == null ||
112+
!jobConfig.Tables.TryGetValue(tableId, out _))
113+
{
114+
return new NotFoundResult();
115+
}
116+
117+
var items = await schemaTrackingExportService
118+
.ListExportJobsAsync(area, id, tableId, cancellationToken)
119+
.ConfigureAwait(false);
120+
121+
return new OkObjectResult(items);
122+
}
123+
124+
var status = await schemaTrackingExportService
125+
.TryGetExportStatusAsync(area, id, tableId, correlationId, cancellationToken)
126+
.ConfigureAwait(false);
127+
128+
if (status == null)
129+
{
130+
return new NotFoundResult();
131+
}
132+
133+
return new OkObjectResult(status);
134+
}
135+
136+
private static string BuildExportStatusLocation(
137+
HttpRequest req,
138+
string area,
139+
string jobId,
140+
string tableId,
141+
string correlationId
142+
)
143+
{
144+
var encodedCorrelation = string.Join(
145+
"/",
146+
correlationId.Split('/', StringSplitOptions.RemoveEmptyEntries).Select(Uri.EscapeDataString)
147+
);
148+
var pathBase = req.PathBase.Value?.TrimEnd('/') ?? string.Empty;
149+
var apiRoot = string.IsNullOrEmpty(pathBase) ? "/api" : pathBase;
150+
var path = string.Format(
151+
CultureInfo.InvariantCulture,
152+
"{0}/config/{1}/{2}/schema/tracking/{3}/export/status/{4}",
153+
apiRoot,
154+
Uri.EscapeDataString(area),
155+
Uri.EscapeDataString(jobId),
156+
Uri.EscapeDataString(tableId),
157+
encodedCorrelation
158+
);
159+
return string.Format(CultureInfo.InvariantCulture, "{0}://{1}{2}", req.Scheme, req.Host.Value, path);
160+
}
161+
}

src/SqlBulkSyncFunction/Functions/GetSyncJobConfig.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ namespace SqlBulkSyncFunction.Functions;
88
public partial class GetSyncJobConfig(
99
ILogger<GetSyncJobConfig> logger,
1010
IOptions<SyncJobsConfig> syncJobsConfig,
11-
ITokenCacheService tokenCacheService
11+
ITokenCacheService tokenCacheService,
12+
SchemaTrackingExportService schemaTrackingExportService
1213
)
1314
{
1415
}

0 commit comments

Comments
 (0)