diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props
index 60e9551..b361514 100644
--- a/src/Directory.Packages.props
+++ b/src/Directory.Packages.props
@@ -19,5 +19,6 @@
+
\ No newline at end of file
diff --git a/src/SqlBulkSyncFunction/Constants.cs b/src/SqlBulkSyncFunction/Constants.cs
index 69aafa4..860d692 100644
--- a/src/SqlBulkSyncFunction/Constants.cs
+++ b/src/SqlBulkSyncFunction/Constants.cs
@@ -22,6 +22,34 @@ public static class Queues
///
public const string SyncJobProgressQueue = "syncjobprogress";
}
+ ///
+ /// NCRONTAB expressions and configuration keys for timer triggers.
+ ///
+ public static class Schedules
+ {
+ ///
+ /// Configuration / environment key for the Custom schedule (same as the timer binding placeholder without % delimiters).
+ ///
+ public const string CustomScheduleConfigurationKey = "ProcessGlobalChangeTrackingSchedule";
+
+ ///
+ /// Timer trigger binding for the custom schedule: %ProcessGlobalChangeTrackingSchedule%.
+ ///
+ public const string CustomScheduleTimerTrigger = "%" + CustomScheduleConfigurationKey + "%";
+
+ /// NCRONTAB for Midnight (0 0 0 * * *).
+ public const string MidnightCron = "0 0 0 * * *";
+
+ /// NCRONTAB for Noon (0 0 12 * * *).
+ public const string NoonCron = "0 0 12 * * *";
+
+ /// NCRONTAB for EveryFiveMinutes (5 */5 * * * *).
+ public const string EveryFiveMinutesCron = "5 */5 * * * *";
+
+ /// NCRONTAB for EveryHour (10 0 * * * *).
+ public const string EveryHourCron = "10 0 * * * *";
+ }
+
public static class Containers
{
///
@@ -30,5 +58,19 @@ public static class Containers
public const string SyncJob = "syncjob";
public const string SyncSchedule = "syncschedule";
+
+ ///
+ /// Blob container for per-job monitoring aggregates (written by the aggregation timer).
+ ///
+ public const string Monitor = "monitor";
+ }
+
+ ///
+ /// Content types for Azure Blob uploads.
+ ///
+ public static class BlobContentTypes
+ {
+ /// JSON documents (UTF-8).
+ public const string Json = "application/json; charset=utf-8";
}
}
diff --git a/src/SqlBulkSyncFunction/Functions/AggregateSyncMonitoring.cs b/src/SqlBulkSyncFunction/Functions/AggregateSyncMonitoring.cs
new file mode 100644
index 0000000..056c02f
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Functions/AggregateSyncMonitoring.cs
@@ -0,0 +1,24 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Azure.Functions.Worker;
+using SqlBulkSyncFunction.Services;
+
+namespace SqlBulkSyncFunction.Functions;
+
+///
+/// Timer that drains schedule and progress queues in order and refreshes per-job aggregate blobs under the monitor container.
+///
+public sealed class AggregateSyncMonitoring(SyncMonitoringAggregationService aggregationService)
+{
+ ///
+ /// Runs every minute on the minute (UTC).
+ ///
+ [Function(nameof(AggregateSyncMonitoring))]
+ public Task Run(
+#pragma warning disable IDE0060 // Remove unused parameter
+ [TimerTrigger("0 */1 * * * *")] TimerInfo timerInfo,
+#pragma warning restore IDE0060 // Remove unused parameter
+ CancellationToken cancellationToken
+ )
+ => aggregationService.ProcessAllQueuesAsync(cancellationToken);
+}
diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobMonitor.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobMonitor.cs
new file mode 100644
index 0000000..d7640fa
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobMonitor.cs
@@ -0,0 +1,368 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.AspNetCore.Http;
+using Microsoft.AspNetCore.Mvc;
+using Microsoft.Azure.Functions.Worker;
+using Microsoft.Data.SqlClient;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using SqlBulkSyncFunction.Helpers;
+using SqlBulkSyncFunction.Models;
+using SqlBulkSyncFunction.Models.Job;
+using SqlBulkSyncFunction.Models.Monitor;
+using SqlBulkSyncFunction.Services;
+
+namespace SqlBulkSyncFunction.Functions;
+
+///
+/// HTTP endpoints for sync job monitoring (aggregated schedule/progress and lightweight table versions).
+///
+public sealed class GetSyncJobMonitor(
+ ILogger logger,
+ IOptions syncJobsConfig,
+ ITokenCacheService tokenCacheService,
+ SyncMonitoringAggregationService syncMonitoringAggregationService,
+ ProcessGlobalChangeTrackingScheduleNextRun processGlobalChangeTrackingScheduleNextRun
+ )
+{
+ private static readonly string[] ProgressStateOrder = Enum.GetNames();
+
+ ///
+ /// GET monitor/{area} returns all enabled schedules for every job in the area as a JSON array.
+ /// GET monitor/{area}/{id} returns all enabled schedules for that job as a JSON array.
+ /// GET monitor/{area}/{id}/{schedule} returns a JSON array with one for that schedule.
+ ///
+ [Function(nameof(GetSyncJobMonitor))]
+ public async Task Run(
+ [HttpTrigger(
+ AuthorizationLevel.Function,
+ "get",
+ Route = "monitor/{area}/{id?}/{schedule?}"
+ )]
+ HttpRequest req,
+ string area,
+ string id,
+ string schedule,
+ CancellationToken cancellationToken
+ )
+ {
+ ArgumentNullException.ThrowIfNull(req);
+
+ if (string.IsNullOrWhiteSpace(area))
+ {
+ return new NotFoundResult();
+ }
+
+ var jobs = syncJobsConfig?.Value?.Jobs?
+ .Where(kv =>
+ kv.Value != null
+ && kv.Value.Area is { Length: > 0 }
+ && StringComparer.OrdinalIgnoreCase.Equals(area, kv.Value.Area)
+ && (string.IsNullOrWhiteSpace(id) || StringComparer.OrdinalIgnoreCase.Equals(kv.Key, id)))
+ .OrderBy(kv => kv.Key, StringComparer.OrdinalIgnoreCase)
+ .ToList();
+
+ if (jobs == null || jobs.Count == 0)
+ {
+ return new NotFoundResult();
+ }
+
+ var utcNow = DateTimeOffset.UtcNow;
+
+ var responses = await EnumerateMonitorResponsesForJobsAsync(
+ jobs,
+ area,
+ utcNow,
+ schedule,
+ cancellationToken
+ )
+ .ToArrayAsync(cancellationToken)
+ .ConfigureAwait(false);
+
+ return new OkObjectResult(responses);
+ }
+
+ ///
+ /// For each matching job, loads table versions once and yields one per schedule
+ /// from (optionally filtered by ).
+ ///
+ private async IAsyncEnumerable EnumerateMonitorResponsesForJobsAsync(
+ IReadOnlyList> jobs,
+ string area,
+ DateTimeOffset utcNow,
+ string schedule,
+ [EnumeratorCancellation] CancellationToken cancellationToken
+ )
+ {
+ foreach (var (jobId, jobConfig) in jobs)
+ {
+ var syncJob = jobConfig.ToSyncJob(
+ scheduleCorrelationId: null,
+ tokenCache: await tokenCacheService.GetTokenCache(jobConfig).ConfigureAwait(false),
+ timestamp: utcNow,
+ expires: utcNow.AddMinutes(4),
+ id: jobId,
+ schedule: nameof(jobConfig.Manual),
+ seed: false
+ );
+
+ var tableRows = await LoadMonitorTableVersionsAsync(syncJob, logger, cancellationToken)
+ .ToArrayAsync(cancellationToken)
+ .ConfigureAwait(false);
+
+ await foreach (
+ var response in EnumerateSchedulesForJob(
+ jobConfig,
+ async resolvedSchedule => await BuildMonitorResponseAsync(
+ area,
+ jobId,
+ resolvedSchedule,
+ tableRows,
+ utcNow,
+ cancellationToken
+ )
+ .ConfigureAwait(false),
+ schedule
+ )
+ .WithCancellation(cancellationToken)
+ .ConfigureAwait(false))
+ {
+ yield return response;
+ }
+ }
+ }
+
+ private async Task BuildMonitorResponseAsync(
+ string area,
+ string id,
+ string resolvedSchedule,
+ IReadOnlyList tableRows,
+ DateTimeOffset utcNow,
+ CancellationToken cancellationToken
+ )
+ {
+ var aggregate = await syncMonitoringAggregationService
+ .GetAggregateAsync(area, id, resolvedSchedule, cancellationToken)
+ .ConfigureAwait(false);
+
+ var scheduleSummaryText = BuildScheduleSummaryText(aggregate);
+
+ var expectedRunAt = aggregate?.ScheduleNext;
+ if (!expectedRunAt.HasValue
+ && !string.Equals(resolvedSchedule, "Manual", StringComparison.OrdinalIgnoreCase)
+ && HasSyncJobNotStartedExecution(aggregate)
+ && processGlobalChangeTrackingScheduleNextRun.TryGetNextRunUtc(
+ resolvedSchedule,
+ utcNow,
+ out var estimatedNext))
+ {
+ expectedRunAt = estimatedNext;
+ }
+
+ return new SyncJobMonitorResponse(
+ Area: area,
+ Id: id,
+ Schedule: resolvedSchedule,
+ ScheduleSummaryText: scheduleSummaryText,
+ ExpectedRunAt: expectedRunAt,
+ LastRunAt: aggregate?.ScheduleTimestamp ?? aggregate?.ScheduleLast,
+ AggregatedAt: aggregate?.AggregatedAt,
+ LatestRunCorrelationId: aggregate?.LatestSyncJobCorrelationId,
+ LatestProgressSteps: [.. BuildLatestProgressSteps(aggregate)],
+ Tables: tableRows
+ );
+ }
+
+ ///
+ /// Enabled schedule names for this job (same rules as scheduled jobs).
+ /// When is non-empty, only the schedule that accepts is enumerated (zero or one item).
+ ///
+ private static async IAsyncEnumerable EnumerateSchedulesForJob(
+ SyncJobConfig job,
+ Func> selector,
+ string schedule = null
+ )
+ {
+ if (!string.IsNullOrWhiteSpace(schedule))
+ {
+ var resolved = ResolveScheduleForJob(job, schedule);
+ if (resolved != null)
+ {
+ yield return await selector(resolved);
+ }
+
+ yield break;
+ }
+
+ if (job.Manual == true)
+ {
+ yield return await selector("Manual");
+ yield break;
+ }
+
+ if (job.Schedules != null && job.Schedules.Count > 0)
+ {
+ foreach (
+ var key in job.Schedules
+ .Where(p => p.Value)
+ .Select(p => p.Key)
+ .OrderBy(k => k, StringComparer.OrdinalIgnoreCase))
+ {
+ yield return await selector(key);
+ }
+
+ yield break;
+ }
+ }
+
+ ///
+ /// Returns the canonical schedule name from the job config for the requested segment, or null if invalid.
+ ///
+ private static string ResolveScheduleForJob(SyncJobConfig job, string schedule)
+ {
+ if (string.IsNullOrWhiteSpace(schedule))
+ {
+ return null;
+ }
+
+ if (job.Manual == true)
+ {
+ return string.Equals(schedule, "Manual", StringComparison.OrdinalIgnoreCase) ? "Manual" : null;
+ }
+
+ if (job.Schedules != null && job.Schedules.Count > 0)
+ {
+ foreach (var key in job.Schedules.Keys)
+ {
+ if (string.Equals(key, schedule, StringComparison.OrdinalIgnoreCase))
+ {
+ return job.Schedules[key] ? key : null;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private static string BuildScheduleSummaryText(SyncJobMonitorAggregate aggregate)
+ {
+ if (aggregate == null)
+ {
+ return "No aggregated schedule data yet; wait for the monitoring timer to process queues.";
+ }
+
+ var name = aggregate.ScheduleName ?? "unknown";
+ var pastDue = aggregate.IsSchedulePastDue ? " (timer was past due when last logged)" : string.Empty;
+ var last = (aggregate.ScheduleTimestamp ?? aggregate.ScheduleLast)?.ToString("o") ?? "n/a";
+ var next = aggregate.ScheduleNext.HasValue ? aggregate.ScheduleNext.Value.ToString("o") : "n/a";
+ var updated = aggregate.ScheduleLastUpdated.HasValue ? aggregate.ScheduleLastUpdated.Value.ToString("o") : "n/a";
+ return FormattableString.Invariant(
+ $"Schedule {name}{pastDue}. Timer last: {last}, next: {next}, status last updated: {updated}.");
+ }
+
+ ///
+ /// True when no run has left the initial -only phase (no started/done/exception/expired steps).
+ /// Used to infer from timer CRON when schedule status blob data is missing.
+ ///
+ private static bool HasSyncJobNotStartedExecution(SyncJobMonitorAggregate aggregate)
+ {
+ if (aggregate?.Runs == null || aggregate.Runs.Count == 0)
+ {
+ return true;
+ }
+
+ foreach (var run in aggregate.Runs)
+ {
+ if (run.StepsByState == null)
+ {
+ continue;
+ }
+
+ foreach (var stateName in run.StepsByState.Keys)
+ {
+ if (string.Equals(stateName, nameof(SyncJobProgressState.Started), StringComparison.Ordinal) ||
+ string.Equals(stateName, nameof(SyncJobProgressState.Done), StringComparison.Ordinal) ||
+ string.Equals(stateName, nameof(SyncJobProgressState.Exception), StringComparison.Ordinal) ||
+ string.Equals(stateName, nameof(SyncJobProgressState.Expired), StringComparison.Ordinal))
+ {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ private static IEnumerable BuildLatestProgressSteps(SyncJobMonitorAggregate aggregate)
+ {
+ if (aggregate == null || string.IsNullOrEmpty(aggregate.LatestSyncJobCorrelationId))
+ {
+ yield break;
+ }
+
+ var run = aggregate.Runs.FirstOrDefault(r =>
+ string.Equals(r.SyncJobCorrelationId, aggregate.LatestSyncJobCorrelationId, StringComparison.Ordinal));
+ if (run == null)
+ {
+ yield break;
+ }
+
+ foreach (var stateName in ProgressStateOrder)
+ {
+ if (run.StepsByState.TryGetValue(stateName, out var step))
+ {
+ yield return new SyncJobMonitorProgressStepDto(stateName, step.Occured, step.Message);
+ }
+ }
+ }
+
+ ///
+ /// Source and target change-tracking versions per table only (no CHANGETABLE counts).
+ ///
+ private static async IAsyncEnumerable LoadMonitorTableVersionsAsync(
+ SyncJob syncJob,
+ ILogger logger,
+ [EnumeratorCancellation] CancellationToken cancellationToken
+ )
+ {
+ ArgumentNullException.ThrowIfNull(syncJob);
+ ArgumentNullException.ThrowIfNull(logger);
+
+ await using SqlConnection
+ sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken },
+ targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken };
+
+ using IDisposable
+ from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database),
+ to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database);
+
+ await sourceConn.OpenAsync(cancellationToken).ConfigureAwait(false);
+ await targetConn.OpenAsync(cancellationToken).ConfigureAwait(false);
+
+ targetConn.EnsureSyncSchemaAndTableExists($"config/{syncJob.Id}/{syncJob.Area}/schema/tracking", logger);
+
+ foreach (var table in syncJob.Tables ?? [])
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var columns = sourceConn.GetColumns(table.Source);
+ var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns);
+ var targetVersion = targetConn.GetTargetVersion(table.Target);
+ var sourceVersionNumber = sourceVersion?.CurrentVersion ?? -1L;
+
+ yield return new SyncJobMonitorTableVersionRow(
+ table.Id,
+ table.Source,
+ sourceVersionNumber,
+ targetVersion.CurrentVersion,
+ targetVersion.Queried,
+ targetVersion.Updated,
+ table.Target
+ );
+ }
+ }
+}
diff --git a/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs b/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs
index d715e42..92d2acc 100644
--- a/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs
+++ b/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs
@@ -6,6 +6,7 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
+using SqlBulkSyncFunction;
using SqlBulkSyncFunction.Helpers;
using SqlBulkSyncFunction.Models;
using SqlBulkSyncFunction.Models.Job;
@@ -25,32 +26,32 @@ SyncProgressService syncProgressService
[Function(FunctionName + nameof(Custom))]
public Task Custom(
- [TimerTrigger("%ProcessGlobalChangeTrackingSchedule%")]
+ [TimerTrigger(Constants.Schedules.CustomScheduleTimerTrigger)]
TimerInfo timerInfo,
CancellationToken cancellationToken
) => ProcessSchedule(timerInfo, cancellationToken);
[Function(FunctionName + nameof(Midnight))]
public Task Midnight(
- [TimerTrigger("0 0 0 * * *")] TimerInfo timerInfo,
+ [TimerTrigger(Constants.Schedules.MidnightCron)] TimerInfo timerInfo,
CancellationToken cancellationToken
) => ProcessSchedule(timerInfo, cancellationToken);
[Function(FunctionName + nameof(Noon))]
public Task Noon(
- [TimerTrigger("0 0 12 * * *")] TimerInfo timerInfo,
+ [TimerTrigger(Constants.Schedules.NoonCron)] TimerInfo timerInfo,
CancellationToken cancellationToken
) => ProcessSchedule(timerInfo, cancellationToken);
[Function(FunctionName + nameof(EveryFiveMinutes))]
public Task EveryFiveMinutes(
- [TimerTrigger("5 */5 * * * *")] TimerInfo timerInfo,
+ [TimerTrigger(Constants.Schedules.EveryFiveMinutesCron)] TimerInfo timerInfo,
CancellationToken cancellationToken
) => ProcessSchedule(timerInfo, cancellationToken);
[Function(FunctionName + nameof(EveryHour))]
public Task EveryHour(
- [TimerTrigger("10 0 * * * *")] TimerInfo timerInfo,
+ [TimerTrigger(Constants.Schedules.EveryHourCron)] TimerInfo timerInfo,
CancellationToken cancellationToken
) => ProcessSchedule(timerInfo, cancellationToken);
diff --git a/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs b/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs
index e9d2f65..12b623d 100644
--- a/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs
+++ b/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs
@@ -90,6 +90,17 @@ CancellationToken cancellationToken
SyncJobs = [syncJob.ToLogSyncJob()]
};
+ var createdProgress = new SyncJobProgress(
+ Area: syncJob.Area,
+ ConfigurationId: syncJob.Id,
+ Schedule: syncJob.Schedule,
+ ScheduleCorrelationId: syncJob.ScheduleCorrelationId,
+ SyncJobCorrelationId: syncJob.CorrelationId,
+ State: SyncJobProgressState.Created
+ );
+
+ await syncProgressService.Report(createdProgress, cancellationToken);
+
await syncProgressService.Report(
result,
cancellationToken
diff --git a/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs
index da85d1d..9531629 100644
--- a/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs
+++ b/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs
@@ -69,12 +69,12 @@ TableSchema tableSchema
"""
MERGE sync.TableVersion as target
USING (
- SELECT @TableName AS TableName,
- @CurrentVersion AS CurrentVersion,
- @MinValidVersion AS MinValidVersion,
- @Queried AS Queried,
- SYSDATETIMEOFFSET() AS Updated,
- SYSDATETIMEOFFSET() AS Created
+ SELECT @TableName AS TableName,
+ @CurrentVersion AS CurrentVersion,
+ @MinValidVersion AS MinValidVersion,
+ @Queried AS Queried,
+ SYSDATETIMEOFFSET() AS Updated,
+ SYSDATETIMEOFFSET() AS Created
) as source
ON target.TableName = source.TableName
WHEN NOT MATCHED BY target
@@ -103,12 +103,13 @@ WHEN MATCHED THEN UPDATE
OUTPUT inserted.TableName,
inserted.CurrentVersion,
inserted.MinValidVersion,
+ inserted.Updated,
inserted.Queried;
"""
)
.SingleOrDefault();
- if (persistedTableVersion != syncedTableVersion)
+ if (persistedTableVersion != syncedTableVersion with { Updated = persistedTableVersion.Updated })
{
throw new Exception($"Failed to persist {syncedTableVersion} ({persistedTableVersion})");
}
@@ -130,7 +131,8 @@ string tableName
SELECT TableName,
CurrentVersion,
MinValidVersion,
- Queried
+ Queried,
+ Updated
FROM sync.TableVersion
WHERE TableName = @TableName
"""
diff --git a/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorResponse.cs b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorResponse.cs
new file mode 100644
index 0000000..1f85773
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorResponse.cs
@@ -0,0 +1,45 @@
+using System;
+using System.Collections.Generic;
+using System.Text.Json.Serialization;
+
+namespace SqlBulkSyncFunction.Models.Job;
+
+///
+/// One progress step in the latest (or selected) run for monitoring.
+///
+/// Progress state name.
+/// When this state was recorded.
+/// Optional details (e.g. exception).
+public record SyncJobMonitorProgressStepDto(
+ [property: JsonPropertyName("state")] string State,
+ [property: JsonPropertyName("occured")] DateTimeOffset Occured,
+ [property: JsonPropertyName("message")] string Message
+);
+
+///
+/// Combined HTTP response for monitor endpoints: job identity, aggregated schedule/progress from blob, and live table versions.
+/// Use GET monitor/{area} for an array for every job in the area (one per enabled schedule per job),
+/// GET monitor/{area}/{id} for an array (one per enabled schedule), or GET monitor/{area}/{id}/{schedule} for one schedule.
+///
+/// Job area segment (same as the route).
+/// Configured job identifier (same as the route).
+/// Resolved schedule name for this payload (canonical key from config or Manual).
+/// Human-readable schedule and timer summary from aggregated logs.
+/// Next timer run from the last aggregated schedule status (UTC), or an estimate from the same NCRONTAB as the global change-tracking timer when status is missing and the job has not started execution yet; null if unknown.
+/// Last timer execution from the last aggregated schedule status (UTC); null if unknown.
+/// When the aggregate blob was last updated (if any).
+/// Correlation id of the run with the newest progress activity.
+/// Ordered steps for that run.
+/// Per-table source and target change-tracking versions only (no row counts).
+public record SyncJobMonitorResponse(
+ [property: JsonPropertyName("area")] string Area,
+ [property: JsonPropertyName("id")] string Id,
+ [property: JsonPropertyName("schedule")] string Schedule,
+ [property: JsonPropertyName("scheduleSummaryText")] string ScheduleSummaryText,
+ [property: JsonPropertyName("expectedRunAt")] DateTimeOffset? ExpectedRunAt,
+ [property: JsonPropertyName("lastRunAt")] DateTimeOffset? LastRunAt,
+ [property: JsonPropertyName("aggregatedAt")] DateTimeOffset? AggregatedAt,
+ [property: JsonPropertyName("latestRunCorrelationId")] string LatestRunCorrelationId,
+ [property: JsonPropertyName("latestProgressSteps")] IReadOnlyList LatestProgressSteps,
+ [property: JsonPropertyName("tables")] IReadOnlyList Tables
+);
diff --git a/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorTableVersionRow.cs b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorTableVersionRow.cs
new file mode 100644
index 0000000..a0d176a
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorTableVersionRow.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Text.Json.Serialization;
+
+namespace SqlBulkSyncFunction.Models.Job;
+
+///
+/// Lightweight per-table row for GET monitor/{area}/{id}: current change-tracking versions only (no change counts).
+///
+/// Configured table identifier from the sync job definition.
+/// Qualified source table name.
+/// Current change tracking version on the source (CHANGE_TRACKING_CURRENT_VERSION() scope), or -1 if unknown.
+/// Last version stored in sync.TableVersion for the target table (or -1 if never synced).
+/// Qualified target table name.
+public record SyncJobMonitorTableVersionRow(
+ [property: JsonPropertyName("id")] string Id,
+ [property: JsonPropertyName("sourceTableName")] string SourceTableName,
+ [property: JsonPropertyName("sourceVersion")] long SourceVersion,
+ [property: JsonPropertyName("targetVersion")] long TargetVersion,
+ [property: JsonPropertyName("queried")] DateTimeOffset Queried,
+ [property: JsonPropertyName("updated")] DateTimeOffset? Updated,
+ [property: JsonPropertyName("targetTableName")] string TargetTableName
+);
diff --git a/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs b/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs
index 00246b6..f2eb1ac 100644
--- a/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs
+++ b/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs
@@ -6,8 +6,6 @@ namespace SqlBulkSyncFunction.Models.Job;
public record SyncJobsConfig
{
- private static readonly string[] DefaultJobSchedules = ["Custom"];
-
public Dictionary Jobs { get; init; }
public Lazy Job)>> ScheduledJobs { get; }
@@ -31,7 +29,7 @@ private static IEnumerable GetJobSchedules(KeyValuePair value.Value)
.Select(key => key.Key);
diff --git a/src/SqlBulkSyncFunction/Models/Monitor/SyncJobMonitorAggregate.cs b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobMonitorAggregate.cs
new file mode 100644
index 0000000..f7b1844
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobMonitorAggregate.cs
@@ -0,0 +1,53 @@
+using System;
+using System.Collections.Generic;
+
+namespace SqlBulkSyncFunction.Models.Monitor;
+
+///
+/// Persisted snapshot for a single sync job and schedule (keyed by area, job id, and schedule name), updated by the monitoring aggregation timer.
+///
+/// Job area (matches ).
+/// Job configuration id (matches dictionary key in ).
+/// Schedule name (e.g. EveryFiveMinutes) that partitions this aggregate from other schedules for the same job.
+public sealed record SyncJobMonitorAggregate(
+ string Area,
+ string JobId,
+ string Schedule
+)
+{
+ /// UTC time when this aggregate was last written.
+ public DateTimeOffset AggregatedAt { get; set; }
+
+ /// Schedule name from the last processed that listed this job.
+ public string ScheduleName { get; set; }
+
+ /// Correlation id of that schedule log blob.
+ public string ScheduleCorrelationId { get; set; }
+
+ /// Schedule log timestamp.
+ public DateTimeOffset? ScheduleTimestamp { get; set; }
+
+ /// Schedule expiry from the log entry.
+ public DateTimeOffset? ScheduleExpires { get; set; }
+
+ /// Last execution time from timer status when the schedule was logged.
+ public DateTimeOffset? ScheduleLast { get; set; }
+
+ /// Next scheduled execution from timer status when the schedule was logged.
+ public DateTimeOffset? ScheduleNext { get; set; }
+
+ /// Last-updated timestamp from timer status when the schedule was logged.
+ public DateTimeOffset? ScheduleLastUpdated { get; set; }
+
+ /// Whether the timer was past due when the schedule was logged.
+ public bool IsSchedulePastDue { get; set; }
+
+ /// Most recent progress seen for this job.
+ public DateTimeOffset? LastProgressOccured { get; set; }
+
+ /// (run id) for the latest progress event.
+ public string LatestSyncJobCorrelationId { get; set; }
+
+ /// Recent runs (newest activity first); size is capped by the aggregator.
+ public List Runs { get; set; } = [];
+}
diff --git a/src/SqlBulkSyncFunction/Models/Monitor/SyncJobProgressStepAggregate.cs b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobProgressStepAggregate.cs
new file mode 100644
index 0000000..49ff2df
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobProgressStepAggregate.cs
@@ -0,0 +1,13 @@
+using System;
+
+namespace SqlBulkSyncFunction.Models.Monitor;
+
+///
+/// One progress state occurrence within a run.
+///
+/// When this state was reported.
+/// Optional message (e.g. exception text).
+public sealed record SyncJobProgressStepAggregate(
+ DateTimeOffset Occured,
+ string Message
+);
diff --git a/src/SqlBulkSyncFunction/Models/Monitor/SyncJobRunAggregate.cs b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobRunAggregate.cs
new file mode 100644
index 0000000..fd106ed
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobRunAggregate.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Collections.Generic;
+
+namespace SqlBulkSyncFunction.Models.Monitor;
+
+///
+/// Progress steps for one execution (one correlation id without a trailing state segment).
+///
+/// Base correlation id for the run (no /Created suffix).
+public sealed record SyncJobRunAggregate(string SyncJobCorrelationId)
+{
+ /// Latest activity time across all steps in this run.
+ public DateTimeOffset LastActivity { get; set; }
+
+ /// Last occurrence per progress state name (e.g. Created, Started, Done).
+ public Dictionary StepsByState { get; set; } = new(StringComparer.OrdinalIgnoreCase);
+}
diff --git a/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs b/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs
index 43541c1..8262bd9 100644
--- a/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs
+++ b/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs
@@ -8,4 +8,5 @@ public record TableVersion
public long CurrentVersion { get; set; }
public long MinValidVersion { get; set; }
public DateTimeOffset Queried { get; set; }
+ public DateTimeOffset? Updated { get; set; }
};
diff --git a/src/SqlBulkSyncFunction/Program.cs b/src/SqlBulkSyncFunction/Program.cs
index f2cd98f..023d338 100644
--- a/src/SqlBulkSyncFunction/Program.cs
+++ b/src/SqlBulkSyncFunction/Program.cs
@@ -31,6 +31,8 @@
.AddSingleton()
.AddSingleton()
.AddSingleton()
+ .AddSingleton()
+ .AddSingleton()
.AddAzureClients(
static az => {
var connectionString = System.Environment.GetEnvironmentVariable("AzureWebJobsStorage");
diff --git a/src/SqlBulkSyncFunction/Services/ProcessGlobalChangeTrackingScheduleNextRun.cs b/src/SqlBulkSyncFunction/Services/ProcessGlobalChangeTrackingScheduleNextRun.cs
new file mode 100644
index 0000000..9c1fe18
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Services/ProcessGlobalChangeTrackingScheduleNextRun.cs
@@ -0,0 +1,115 @@
+using System;
+using System.Collections.Concurrent;
+using Cronos;
+using Microsoft.Extensions.Configuration;
+using SqlBulkSyncFunction.Functions;
+
+namespace SqlBulkSyncFunction.Services;
+
+///
+/// Estimates the next Azure Functions timer fire time for
+/// using the same NCRONTAB expressions as the TimerTrigger attributes (including Custom from app settings).
+/// Parsed cron expressions are cached per schedule name so repeated lookups avoid resolving cron and configuration.
+///
+/// Application configuration (used for the Custom schedule expression).
+public sealed class ProcessGlobalChangeTrackingScheduleNextRun(IConfiguration configuration)
+{
+ ///
+ /// Parsed instances keyed by normalized schedule name (trimmed; ).
+ /// If the Custom cron in configuration changes at runtime, restart the worker to pick up a new expression.
+ ///
+ private readonly ConcurrentDictionary _expressionsByScheduleName = new(StringComparer.Ordinal);
+
+ ///
+ /// Resolves the next scheduled run strictly after when the expression is valid.
+ ///
+ /// Schedule key (e.g. EveryFiveMinutes, Custom).
+ /// Reference instant (typically UTC now).
+ /// Next occurrence in UTC, if resolved.
+ /// when a cron expression was resolved and the next occurrence exists.
+ public bool TryGetNextRunUtc(string scheduleName, DateTimeOffset fromUtc, out DateTimeOffset nextRunUtc)
+ {
+ nextRunUtc = default;
+ if (string.IsNullOrWhiteSpace(scheduleName))
+ {
+ return false;
+ }
+
+ if (!TryParseScheduleCron(scheduleName, out var expression))
+ {
+ return false;
+ }
+
+ var from = fromUtc.UtcDateTime;
+ var next = expression.GetNextOccurrence(from, TimeZoneInfo.Utc, inclusive: false);
+ if (!next.HasValue)
+ {
+ return false;
+ }
+
+ nextRunUtc = new DateTimeOffset(DateTime.SpecifyKind(next.Value, DateTimeKind.Utc), TimeSpan.Zero);
+ return true;
+ }
+
+ ///
+ /// Maps a schedule name to the same cron string as , parses it, and caches the result by schedule name.
+ ///
+ /// Schedule key from configuration (e.g. Midnight).
+ /// Parsed expression, or when unresolved or invalid.
+ /// when a valid expression was resolved.
+ public bool TryParseScheduleCron(string scheduleName, out CronExpression expression)
+ {
+ expression = null;
+ if (string.IsNullOrWhiteSpace(scheduleName))
+ {
+ return false;
+ }
+
+ var cacheKey = scheduleName.Trim();
+ if (_expressionsByScheduleName.TryGetValue(cacheKey, out var cached))
+ {
+ expression = cached;
+ return true;
+ }
+
+ if (!TryResolveCronString(scheduleName, out var cron) || string.IsNullOrWhiteSpace(cron))
+ {
+ return false;
+ }
+
+ try
+ {
+ var parsed = CronExpression.Parse(cron, CronFormat.IncludeSeconds);
+ expression = _expressionsByScheduleName.GetOrAdd(cacheKey, parsed);
+ return true;
+ }
+ catch (CronFormatException)
+ {
+ return false;
+ }
+ }
+
+ ///
+ /// Resolves the NCRONTAB string for (including Custom from configuration).
+ ///
+ /// Schedule key (method name on ).
+ /// Cron string when recognized; otherwise .
+ /// when a cron string was associated with the schedule name.
+ private bool TryResolveCronString(string scheduleName, out string cron)
+ {
+ cron = null;
+ var key = scheduleName.Trim();
+ cron = key switch
+ {
+ _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.Midnight), StringComparison.Ordinal) => Constants.Schedules.MidnightCron,
+ _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.Noon), StringComparison.Ordinal) => Constants.Schedules.NoonCron,
+ _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.EveryFiveMinutes), StringComparison.Ordinal) => Constants.Schedules.EveryFiveMinutesCron,
+ _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.EveryHour), StringComparison.Ordinal) => Constants.Schedules.EveryHourCron,
+ _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.Custom), StringComparison.Ordinal)
+ => configuration[Constants.Schedules.CustomScheduleConfigurationKey]?.Trim(),
+ _ => null,
+ };
+
+ return !string.IsNullOrWhiteSpace(cron);
+ }
+}
diff --git a/src/SqlBulkSyncFunction/Services/SyncMonitoringAggregationService.cs b/src/SqlBulkSyncFunction/Services/SyncMonitoringAggregationService.cs
new file mode 100644
index 0000000..b04af2b
--- /dev/null
+++ b/src/SqlBulkSyncFunction/Services/SyncMonitoringAggregationService.cs
@@ -0,0 +1,373 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure;
+using Azure.Storage.Blobs;
+using Azure.Storage.Blobs.Models;
+using Azure.Storage.Queues;
+using Microsoft.Extensions.Logging;
+using SqlBulkSyncFunction.Models;
+using SqlBulkSyncFunction.Models.Monitor;
+
+namespace SqlBulkSyncFunction.Services;
+
+///
+/// Drains schedule and sync-job progress queues in a fixed order, merges blobs into per-job aggregate blobs for cheap reads.
+/// Aggregates are partitioned by schedule under {area}/{jobId}/{schedule}.json with optimistic concurrency (ETags).
+///
+public sealed class SyncMonitoringAggregationService(
+ ILogger logger,
+ QueueServiceClient queueService,
+ BlobServiceClient blobServiceClient
+ )
+{
+ private const int MaxRunsPerAggregate = 10;
+ private const int MaxMessagesPerReceive = 32;
+ private const int MaxConcurrencyRetries = 10;
+
+ private static readonly JsonSerializerOptions JsonOptions = new()
+ {
+ PropertyNameCaseInsensitive = true,
+ WriteIndented = false,
+ };
+
+ private readonly QueueClient _logScheduleQueue = GetOrCreateQueue(queueService, Constants.Queues.LogScheduleQueue);
+ private readonly Dictionary _progressQueues = GetProgressQueues(queueService);
+ private readonly BlobContainerClient _syncJobContainer = GetOrCreateContainer(blobServiceClient, Constants.Containers.SyncJob);
+ private readonly BlobContainerClient _syncScheduleContainer = GetOrCreateContainer(blobServiceClient, Constants.Containers.SyncSchedule);
+ private readonly BlobContainerClient _monitorContainer = GetOrCreateContainer(blobServiceClient, Constants.Containers.Monitor);
+
+ ///
+ /// Returns the persisted aggregate for , , and , or null if none.
+ ///
+ public async Task GetAggregateAsync(string area, string jobId, string schedule, CancellationToken cancellationToken)
+ {
+ var path = GetMonitorBlobPath(area, jobId, schedule);
+ var blob = _monitorContainer.GetBlobClient(path);
+ if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ return null;
+ }
+
+ var download = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false);
+ return download.Value.Content.ToObjectFromJson(JsonOptions);
+ }
+
+ ///
+ /// Processes all monitoring queues sequentially: log schedule first, then progress queues in enum order.
+ ///
+ public async Task ProcessAllQueuesAsync(CancellationToken cancellationToken)
+ {
+ await DrainQueueAsync(_logScheduleQueue, ProcessLogScheduleAsync, cancellationToken).ConfigureAwait(false);
+
+ foreach (var state in Enum.GetValues())
+ {
+ if (_progressQueues.TryGetValue(state, out var client))
+ {
+ await DrainQueueAsync(client, ProcessSyncJobProgressAsync, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private async Task DrainQueueAsync(
+ QueueClient queue,
+ Func handleCorrelationId,
+ CancellationToken cancellationToken
+ )
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var response = await queue
+ .ReceiveMessagesAsync(maxMessages: MaxMessagesPerReceive, cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+
+ if (response.Value.Length == 0)
+ {
+ return;
+ }
+
+ foreach (var message in response.Value)
+ {
+ var correlationId = message.Body.ToString();
+ if (string.IsNullOrWhiteSpace(correlationId))
+ {
+ await queue.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken).ConfigureAwait(false);
+ continue;
+ }
+
+ try
+ {
+ await handleCorrelationId(correlationId, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ logger.LogWarning(ex, "Monitoring aggregation failed for queue {Queue} message {CorrelationId}", queue.Name, correlationId);
+ }
+
+ await queue.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+
+ private async Task ProcessLogScheduleAsync(string correlationId, CancellationToken cancellationToken)
+ {
+ var blob = _syncScheduleContainer.GetBlobClient($"{correlationId}.json");
+ if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ logger.LogWarning("Schedule blob missing for {CorrelationId}", correlationId);
+ return;
+ }
+
+ var download = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false);
+ var logSchedule = download.Value.Content.ToObjectFromJson(JsonOptions);
+ if (logSchedule == null || string.IsNullOrWhiteSpace(logSchedule.Name))
+ {
+ return;
+ }
+
+ foreach (var syncJob in logSchedule.SyncJobs ?? [])
+ {
+ if (string.IsNullOrWhiteSpace(syncJob.Area) || string.IsNullOrWhiteSpace(syncJob.Id))
+ {
+ continue;
+ }
+
+ await SaveAggregateWithConcurrencyRetryAsync(
+ syncJob.Area,
+ syncJob.Id,
+ logSchedule.Name,
+ aggregate =>
+ {
+ aggregate ??= new SyncJobMonitorAggregate(syncJob.Area, syncJob.Id, logSchedule.Name);
+ aggregate.ScheduleName = logSchedule.Name;
+ aggregate.ScheduleCorrelationId = logSchedule.CorrelationId;
+ aggregate.ScheduleTimestamp = logSchedule.Timestamp;
+ aggregate.ScheduleExpires = logSchedule.Expires;
+ aggregate.ScheduleLast = logSchedule.Last;
+ aggregate.ScheduleNext = logSchedule.Next;
+ aggregate.ScheduleLastUpdated = logSchedule.LastUpdated;
+ aggregate.IsSchedulePastDue = logSchedule.IsPastDue;
+ aggregate.AggregatedAt = DateTimeOffset.UtcNow;
+ return aggregate;
+ },
+ cancellationToken
+ ).ConfigureAwait(false);
+ }
+ }
+
+ private async Task ProcessSyncJobProgressAsync(string correlationId, CancellationToken cancellationToken)
+ {
+ var blob = _syncJobContainer.GetBlobClient($"{correlationId}.json");
+ if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ logger.LogWarning("Sync job progress blob missing for {CorrelationId}", correlationId);
+ return;
+ }
+
+ var download = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false);
+ var progress = download.Value.Content.ToObjectFromJson(JsonOptions);
+ if (progress == null ||
+ string.IsNullOrWhiteSpace(progress.Area) ||
+ string.IsNullOrWhiteSpace(progress.ConfigurationId) ||
+ string.IsNullOrWhiteSpace(progress.Schedule))
+ {
+ return;
+ }
+
+ var scheduleName = progress.Schedule;
+
+ await SaveAggregateWithConcurrencyRetryAsync(
+ progress.Area,
+ progress.ConfigurationId,
+ scheduleName,
+ aggregate =>
+ {
+ aggregate ??= new SyncJobMonitorAggregate(progress.Area, progress.ConfigurationId, scheduleName);
+
+ var runKey = progress.SyncJobCorrelationId;
+ var run = aggregate.Runs.FirstOrDefault(r => string.Equals(r.SyncJobCorrelationId, runKey, StringComparison.Ordinal));
+ if (run == null)
+ {
+ run = new SyncJobRunAggregate(runKey);
+ aggregate.Runs.Add(run);
+ }
+
+ var stateName = progress.State.ToString("F");
+ run.StepsByState[stateName] = new SyncJobProgressStepAggregate(progress.Occured, progress.Message);
+ run.LastActivity = run.StepsByState.Values.Max(s => s.Occured);
+
+ PruneRuns(aggregate);
+
+ if (!aggregate.LastProgressOccured.HasValue || progress.Occured >= aggregate.LastProgressOccured.Value)
+ {
+ aggregate.LastProgressOccured = progress.Occured;
+ aggregate.LatestSyncJobCorrelationId = runKey;
+ }
+
+ aggregate.AggregatedAt = DateTimeOffset.UtcNow;
+ return aggregate;
+ },
+ cancellationToken
+ ).ConfigureAwait(false);
+ }
+
+ private static void PruneRuns(SyncJobMonitorAggregate aggregate)
+ {
+ if (aggregate.Runs.Count <= MaxRunsPerAggregate)
+ {
+ return;
+ }
+
+ var ordered = aggregate.Runs
+ .OrderByDescending(r => r.LastActivity)
+ .Take(MaxRunsPerAggregate)
+ .ToList();
+ aggregate.Runs.Clear();
+ aggregate.Runs.AddRange(ordered);
+ }
+
+ ///
+ /// Loads, applies , and saves with ETag preconditions; retries on write conflicts.
+ ///
+ private async Task SaveAggregateWithConcurrencyRetryAsync(
+ string area,
+ string jobId,
+ string scheduleName,
+ Func merge,
+ CancellationToken cancellationToken
+ )
+ {
+ var path = GetMonitorBlobPath(area, jobId, scheduleName);
+ var blob = _monitorContainer.GetBlobClient(path);
+
+ for (var attempt = 0; attempt < MaxConcurrencyRetries; attempt++)
+ {
+ ETag? etag = null;
+ SyncJobMonitorAggregate existing = null;
+ if (await blob.ExistsAsync(cancellationToken).ConfigureAwait(false))
+ {
+ var dl = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false);
+ etag = dl.Value.Details.ETag;
+ existing = dl.Value.Content.ToObjectFromJson(JsonOptions);
+ }
+
+ var merged = merge(existing);
+ if (merged == null)
+ {
+ return;
+ }
+
+ try
+ {
+ await UploadMonitorAggregateAsync(blob, merged, etag, cancellationToken).ConfigureAwait(false);
+ return;
+ }
+ catch (RequestFailedException ex) when (IsConcurrencyConflict(ex) && attempt < MaxConcurrencyRetries - 1)
+ {
+ logger.LogDebug(
+ ex,
+ "Monitor aggregate concurrency conflict for {Path}, retry {Attempt}",
+ path,
+ attempt + 1);
+ }
+ }
+
+ throw new InvalidOperationException(
+ FormattableString.Invariant($"Could not save monitor aggregate after {MaxConcurrencyRetries} attempts: {path}"));
+ }
+
+ private static bool IsConcurrencyConflict(RequestFailedException ex)
+ => ex.Status == 412 || ex.Status == 409;
+
+ private static async Task UploadMonitorAggregateAsync(
+ BlobClient blob,
+ SyncJobMonitorAggregate aggregate,
+ ETag? etagFromDownload,
+ CancellationToken cancellationToken
+ )
+ {
+ var data = BinaryData.FromObjectAsJson(aggregate, JsonOptions);
+ var options = new BlobUploadOptions
+ {
+ HttpHeaders = new BlobHttpHeaders
+ {
+ ContentType = Constants.BlobContentTypes.Json,
+ },
+ };
+
+ if (etagFromDownload.HasValue)
+ {
+ options.Conditions = new BlobRequestConditions { IfMatch = etagFromDownload.Value };
+ }
+ else
+ {
+ options.Conditions = new BlobRequestConditions { IfNoneMatch = ETag.All };
+ }
+
+ await blob.UploadAsync(data, options, cancellationToken).ConfigureAwait(false);
+ }
+
+ ///
+ /// Virtual path under the monitor container: {area}/{jobId}/{schedule}.json (segments sanitized).
+ ///
+ internal static string GetMonitorBlobPath(string area, string jobId, string schedule)
+ => FormattableString.Invariant(
+ $"{SanitizeBlobPathSegment(area)}/{SanitizeBlobPathSegment(jobId)}/{SanitizeBlobPathSegment(schedule)}.json");
+
+ ///
+ /// Replaces characters that are unsafe or ambiguous in blob path segments.
+ ///
+ internal static string SanitizeBlobPathSegment(string segment)
+ {
+ if (string.IsNullOrWhiteSpace(segment))
+ {
+ return "_";
+ }
+
+ var s = segment.Trim();
+ var sb = new StringBuilder(s.Length);
+ foreach (var c in s)
+ {
+ if (c == '/' || c == '\\' || c == '#' || c == '?' || char.IsControl(c))
+ {
+ _ = sb.Append('_');
+ }
+ else
+ {
+ _ = sb.Append(c);
+ }
+ }
+
+ return sb.Length > 0 ? sb.ToString() : "_";
+ }
+
+ private static QueueClient GetOrCreateQueue(QueueServiceClient queueService, string queueName)
+ {
+ var q = queueService.GetQueueClient(queueName);
+ _ = q.CreateIfNotExists();
+ return q;
+ }
+
+ private static BlobContainerClient GetOrCreateContainer(BlobServiceClient blobServiceClient, string name)
+ {
+ var c = blobServiceClient.GetBlobContainerClient(name);
+ _ = c.CreateIfNotExists();
+ return c;
+ }
+
+ private static Dictionary GetProgressQueues(QueueServiceClient queueService)
+ {
+ var map = new Dictionary();
+ foreach (var state in Enum.GetValues())
+ {
+ var queueName = FormattableString.Invariant($"{Constants.Queues.SyncJobProgressQueue}-{state.ToString("F").ToLowerInvariant()}");
+ map[state] = GetOrCreateQueue(queueService, queueName);
+ }
+
+ return map;
+ }
+}
diff --git a/src/SqlBulkSyncFunction/Services/SyncProgressService.cs b/src/SqlBulkSyncFunction/Services/SyncProgressService.cs
index 61cd0b5..b2c6b3b 100644
--- a/src/SqlBulkSyncFunction/Services/SyncProgressService.cs
+++ b/src/SqlBulkSyncFunction/Services/SyncProgressService.cs
@@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
+using Azure.Storage.Blobs.Models;
using Azure.Storage.Queues;
using SqlBulkSyncFunction.Models;
@@ -62,9 +63,12 @@ public async Task Report(SyncJobProgress value, CancellationToken cancellationTo
_ = await _syncJobBlobContainerClient
.GetBlobClient($"{value.CorrelationId}.json")
.UploadAsync(
- content: BinaryData.FromObjectAsJson(value),
- overwrite: true,
- cancellationToken: cancellationToken
+ BinaryData.FromObjectAsJson(value),
+ new BlobUploadOptions
+ {
+ HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json },
+ },
+ cancellationToken
);
if (_queueClients.TryGetValue(value.State, out var _queueClient))
@@ -80,9 +84,12 @@ public async Task Report(LogSchedule value, CancellationToken cancellationToken)
_ = await _syncScheduleBlobContainerClient
.GetBlobClient($"{value.CorrelationId}.json")
.UploadAsync(
- content: BinaryData.FromObjectAsJson(value),
- overwrite: true,
- cancellationToken: cancellationToken
+ BinaryData.FromObjectAsJson(value),
+ new BlobUploadOptions
+ {
+ HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json },
+ },
+ cancellationToken
);
_ = await _logScheduleQueueClient.SendMessageAsync(value.CorrelationId, cancellationToken);
diff --git a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj
index a780b5f..20a1160 100644
--- a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj
+++ b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj
@@ -7,6 +7,7 @@
<_FunctionsSkipCleanOutput>true
+