diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 60e9551..b361514 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -19,5 +19,6 @@ + \ No newline at end of file diff --git a/src/SqlBulkSyncFunction/Constants.cs b/src/SqlBulkSyncFunction/Constants.cs index 69aafa4..860d692 100644 --- a/src/SqlBulkSyncFunction/Constants.cs +++ b/src/SqlBulkSyncFunction/Constants.cs @@ -22,6 +22,34 @@ public static class Queues /// public const string SyncJobProgressQueue = "syncjobprogress"; } + /// + /// NCRONTAB expressions and configuration keys for timer triggers. + /// + public static class Schedules + { + /// + /// Configuration / environment key for the Custom schedule (same as the timer binding placeholder without % delimiters). + /// + public const string CustomScheduleConfigurationKey = "ProcessGlobalChangeTrackingSchedule"; + + /// + /// Timer trigger binding for the custom schedule: %ProcessGlobalChangeTrackingSchedule%. + /// + public const string CustomScheduleTimerTrigger = "%" + CustomScheduleConfigurationKey + "%"; + + /// NCRONTAB for Midnight (0 0 0 * * *). + public const string MidnightCron = "0 0 0 * * *"; + + /// NCRONTAB for Noon (0 0 12 * * *). + public const string NoonCron = "0 0 12 * * *"; + + /// NCRONTAB for EveryFiveMinutes (5 */5 * * * *). + public const string EveryFiveMinutesCron = "5 */5 * * * *"; + + /// NCRONTAB for EveryHour (10 0 * * * *). + public const string EveryHourCron = "10 0 * * * *"; + } + public static class Containers { /// @@ -30,5 +58,19 @@ public static class Containers public const string SyncJob = "syncjob"; public const string SyncSchedule = "syncschedule"; + + /// + /// Blob container for per-job monitoring aggregates (written by the aggregation timer). + /// + public const string Monitor = "monitor"; + } + + /// + /// Content types for Azure Blob uploads. + /// + public static class BlobContentTypes + { + /// JSON documents (UTF-8). + public const string Json = "application/json; charset=utf-8"; } } diff --git a/src/SqlBulkSyncFunction/Functions/AggregateSyncMonitoring.cs b/src/SqlBulkSyncFunction/Functions/AggregateSyncMonitoring.cs new file mode 100644 index 0000000..056c02f --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/AggregateSyncMonitoring.cs @@ -0,0 +1,24 @@ +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Functions.Worker; +using SqlBulkSyncFunction.Services; + +namespace SqlBulkSyncFunction.Functions; + +/// +/// Timer that drains schedule and progress queues in order and refreshes per-job aggregate blobs under the monitor container. +/// +public sealed class AggregateSyncMonitoring(SyncMonitoringAggregationService aggregationService) +{ + /// + /// Runs every minute on the minute (UTC). + /// + [Function(nameof(AggregateSyncMonitoring))] + public Task Run( +#pragma warning disable IDE0060 // Remove unused parameter + [TimerTrigger("0 */1 * * * *")] TimerInfo timerInfo, +#pragma warning restore IDE0060 // Remove unused parameter + CancellationToken cancellationToken + ) + => aggregationService.ProcessAllQueuesAsync(cancellationToken); +} diff --git a/src/SqlBulkSyncFunction/Functions/GetSyncJobMonitor.cs b/src/SqlBulkSyncFunction/Functions/GetSyncJobMonitor.cs new file mode 100644 index 0000000..d7640fa --- /dev/null +++ b/src/SqlBulkSyncFunction/Functions/GetSyncJobMonitor.cs @@ -0,0 +1,368 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using SqlBulkSyncFunction.Helpers; +using SqlBulkSyncFunction.Models; +using SqlBulkSyncFunction.Models.Job; +using SqlBulkSyncFunction.Models.Monitor; +using SqlBulkSyncFunction.Services; + +namespace SqlBulkSyncFunction.Functions; + +/// +/// HTTP endpoints for sync job monitoring (aggregated schedule/progress and lightweight table versions). +/// +public sealed class GetSyncJobMonitor( + ILogger logger, + IOptions syncJobsConfig, + ITokenCacheService tokenCacheService, + SyncMonitoringAggregationService syncMonitoringAggregationService, + ProcessGlobalChangeTrackingScheduleNextRun processGlobalChangeTrackingScheduleNextRun + ) +{ + private static readonly string[] ProgressStateOrder = Enum.GetNames(); + + /// + /// GET monitor/{area} returns all enabled schedules for every job in the area as a JSON array. + /// GET monitor/{area}/{id} returns all enabled schedules for that job as a JSON array. + /// GET monitor/{area}/{id}/{schedule} returns a JSON array with one for that schedule. + /// + [Function(nameof(GetSyncJobMonitor))] + public async Task Run( + [HttpTrigger( + AuthorizationLevel.Function, + "get", + Route = "monitor/{area}/{id?}/{schedule?}" + )] + HttpRequest req, + string area, + string id, + string schedule, + CancellationToken cancellationToken + ) + { + ArgumentNullException.ThrowIfNull(req); + + if (string.IsNullOrWhiteSpace(area)) + { + return new NotFoundResult(); + } + + var jobs = syncJobsConfig?.Value?.Jobs? + .Where(kv => + kv.Value != null + && kv.Value.Area is { Length: > 0 } + && StringComparer.OrdinalIgnoreCase.Equals(area, kv.Value.Area) + && (string.IsNullOrWhiteSpace(id) || StringComparer.OrdinalIgnoreCase.Equals(kv.Key, id))) + .OrderBy(kv => kv.Key, StringComparer.OrdinalIgnoreCase) + .ToList(); + + if (jobs == null || jobs.Count == 0) + { + return new NotFoundResult(); + } + + var utcNow = DateTimeOffset.UtcNow; + + var responses = await EnumerateMonitorResponsesForJobsAsync( + jobs, + area, + utcNow, + schedule, + cancellationToken + ) + .ToArrayAsync(cancellationToken) + .ConfigureAwait(false); + + return new OkObjectResult(responses); + } + + /// + /// For each matching job, loads table versions once and yields one per schedule + /// from (optionally filtered by ). + /// + private async IAsyncEnumerable EnumerateMonitorResponsesForJobsAsync( + IReadOnlyList> jobs, + string area, + DateTimeOffset utcNow, + string schedule, + [EnumeratorCancellation] CancellationToken cancellationToken + ) + { + foreach (var (jobId, jobConfig) in jobs) + { + var syncJob = jobConfig.ToSyncJob( + scheduleCorrelationId: null, + tokenCache: await tokenCacheService.GetTokenCache(jobConfig).ConfigureAwait(false), + timestamp: utcNow, + expires: utcNow.AddMinutes(4), + id: jobId, + schedule: nameof(jobConfig.Manual), + seed: false + ); + + var tableRows = await LoadMonitorTableVersionsAsync(syncJob, logger, cancellationToken) + .ToArrayAsync(cancellationToken) + .ConfigureAwait(false); + + await foreach ( + var response in EnumerateSchedulesForJob( + jobConfig, + async resolvedSchedule => await BuildMonitorResponseAsync( + area, + jobId, + resolvedSchedule, + tableRows, + utcNow, + cancellationToken + ) + .ConfigureAwait(false), + schedule + ) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + yield return response; + } + } + } + + private async Task BuildMonitorResponseAsync( + string area, + string id, + string resolvedSchedule, + IReadOnlyList tableRows, + DateTimeOffset utcNow, + CancellationToken cancellationToken + ) + { + var aggregate = await syncMonitoringAggregationService + .GetAggregateAsync(area, id, resolvedSchedule, cancellationToken) + .ConfigureAwait(false); + + var scheduleSummaryText = BuildScheduleSummaryText(aggregate); + + var expectedRunAt = aggregate?.ScheduleNext; + if (!expectedRunAt.HasValue + && !string.Equals(resolvedSchedule, "Manual", StringComparison.OrdinalIgnoreCase) + && HasSyncJobNotStartedExecution(aggregate) + && processGlobalChangeTrackingScheduleNextRun.TryGetNextRunUtc( + resolvedSchedule, + utcNow, + out var estimatedNext)) + { + expectedRunAt = estimatedNext; + } + + return new SyncJobMonitorResponse( + Area: area, + Id: id, + Schedule: resolvedSchedule, + ScheduleSummaryText: scheduleSummaryText, + ExpectedRunAt: expectedRunAt, + LastRunAt: aggregate?.ScheduleTimestamp ?? aggregate?.ScheduleLast, + AggregatedAt: aggregate?.AggregatedAt, + LatestRunCorrelationId: aggregate?.LatestSyncJobCorrelationId, + LatestProgressSteps: [.. BuildLatestProgressSteps(aggregate)], + Tables: tableRows + ); + } + + /// + /// Enabled schedule names for this job (same rules as scheduled jobs). + /// When is non-empty, only the schedule that accepts is enumerated (zero or one item). + /// + private static async IAsyncEnumerable EnumerateSchedulesForJob( + SyncJobConfig job, + Func> selector, + string schedule = null + ) + { + if (!string.IsNullOrWhiteSpace(schedule)) + { + var resolved = ResolveScheduleForJob(job, schedule); + if (resolved != null) + { + yield return await selector(resolved); + } + + yield break; + } + + if (job.Manual == true) + { + yield return await selector("Manual"); + yield break; + } + + if (job.Schedules != null && job.Schedules.Count > 0) + { + foreach ( + var key in job.Schedules + .Where(p => p.Value) + .Select(p => p.Key) + .OrderBy(k => k, StringComparer.OrdinalIgnoreCase)) + { + yield return await selector(key); + } + + yield break; + } + } + + /// + /// Returns the canonical schedule name from the job config for the requested segment, or null if invalid. + /// + private static string ResolveScheduleForJob(SyncJobConfig job, string schedule) + { + if (string.IsNullOrWhiteSpace(schedule)) + { + return null; + } + + if (job.Manual == true) + { + return string.Equals(schedule, "Manual", StringComparison.OrdinalIgnoreCase) ? "Manual" : null; + } + + if (job.Schedules != null && job.Schedules.Count > 0) + { + foreach (var key in job.Schedules.Keys) + { + if (string.Equals(key, schedule, StringComparison.OrdinalIgnoreCase)) + { + return job.Schedules[key] ? key : null; + } + } + } + + return null; + } + + private static string BuildScheduleSummaryText(SyncJobMonitorAggregate aggregate) + { + if (aggregate == null) + { + return "No aggregated schedule data yet; wait for the monitoring timer to process queues."; + } + + var name = aggregate.ScheduleName ?? "unknown"; + var pastDue = aggregate.IsSchedulePastDue ? " (timer was past due when last logged)" : string.Empty; + var last = (aggregate.ScheduleTimestamp ?? aggregate.ScheduleLast)?.ToString("o") ?? "n/a"; + var next = aggregate.ScheduleNext.HasValue ? aggregate.ScheduleNext.Value.ToString("o") : "n/a"; + var updated = aggregate.ScheduleLastUpdated.HasValue ? aggregate.ScheduleLastUpdated.Value.ToString("o") : "n/a"; + return FormattableString.Invariant( + $"Schedule {name}{pastDue}. Timer last: {last}, next: {next}, status last updated: {updated}."); + } + + /// + /// True when no run has left the initial -only phase (no started/done/exception/expired steps). + /// Used to infer from timer CRON when schedule status blob data is missing. + /// + private static bool HasSyncJobNotStartedExecution(SyncJobMonitorAggregate aggregate) + { + if (aggregate?.Runs == null || aggregate.Runs.Count == 0) + { + return true; + } + + foreach (var run in aggregate.Runs) + { + if (run.StepsByState == null) + { + continue; + } + + foreach (var stateName in run.StepsByState.Keys) + { + if (string.Equals(stateName, nameof(SyncJobProgressState.Started), StringComparison.Ordinal) || + string.Equals(stateName, nameof(SyncJobProgressState.Done), StringComparison.Ordinal) || + string.Equals(stateName, nameof(SyncJobProgressState.Exception), StringComparison.Ordinal) || + string.Equals(stateName, nameof(SyncJobProgressState.Expired), StringComparison.Ordinal)) + { + return false; + } + } + } + + return true; + } + + private static IEnumerable BuildLatestProgressSteps(SyncJobMonitorAggregate aggregate) + { + if (aggregate == null || string.IsNullOrEmpty(aggregate.LatestSyncJobCorrelationId)) + { + yield break; + } + + var run = aggregate.Runs.FirstOrDefault(r => + string.Equals(r.SyncJobCorrelationId, aggregate.LatestSyncJobCorrelationId, StringComparison.Ordinal)); + if (run == null) + { + yield break; + } + + foreach (var stateName in ProgressStateOrder) + { + if (run.StepsByState.TryGetValue(stateName, out var step)) + { + yield return new SyncJobMonitorProgressStepDto(stateName, step.Occured, step.Message); + } + } + } + + /// + /// Source and target change-tracking versions per table only (no CHANGETABLE counts). + /// + private static async IAsyncEnumerable LoadMonitorTableVersionsAsync( + SyncJob syncJob, + ILogger logger, + [EnumeratorCancellation] CancellationToken cancellationToken + ) + { + ArgumentNullException.ThrowIfNull(syncJob); + ArgumentNullException.ThrowIfNull(logger); + + await using SqlConnection + sourceConn = new(syncJob.SourceDbConnection) { AccessToken = syncJob.SourceDbAccessToken }, + targetConn = new(syncJob.TargetDbConnection) { AccessToken = syncJob.TargetDbAccessToken }; + + using IDisposable + from = logger.BeginScope("{DataSource}.{Database}", sourceConn.DataSource, sourceConn.Database), + to = logger.BeginScope("{DataSource}.{Database}", targetConn.DataSource, targetConn.Database); + + await sourceConn.OpenAsync(cancellationToken).ConfigureAwait(false); + await targetConn.OpenAsync(cancellationToken).ConfigureAwait(false); + + targetConn.EnsureSyncSchemaAndTableExists($"config/{syncJob.Id}/{syncJob.Area}/schema/tracking", logger); + + foreach (var table in syncJob.Tables ?? []) + { + cancellationToken.ThrowIfCancellationRequested(); + + var columns = sourceConn.GetColumns(table.Source); + var sourceVersion = sourceConn.GetSourceVersion(table.Source, globalChangeTracking: true, columns); + var targetVersion = targetConn.GetTargetVersion(table.Target); + var sourceVersionNumber = sourceVersion?.CurrentVersion ?? -1L; + + yield return new SyncJobMonitorTableVersionRow( + table.Id, + table.Source, + sourceVersionNumber, + targetVersion.CurrentVersion, + targetVersion.Queried, + targetVersion.Updated, + table.Target + ); + } + } +} diff --git a/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs b/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs index d715e42..92d2acc 100644 --- a/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs +++ b/src/SqlBulkSyncFunction/Functions/ProcessGlobalChangeTrackingSchedule.cs @@ -6,6 +6,7 @@ using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using SqlBulkSyncFunction; using SqlBulkSyncFunction.Helpers; using SqlBulkSyncFunction.Models; using SqlBulkSyncFunction.Models.Job; @@ -25,32 +26,32 @@ SyncProgressService syncProgressService [Function(FunctionName + nameof(Custom))] public Task Custom( - [TimerTrigger("%ProcessGlobalChangeTrackingSchedule%")] + [TimerTrigger(Constants.Schedules.CustomScheduleTimerTrigger)] TimerInfo timerInfo, CancellationToken cancellationToken ) => ProcessSchedule(timerInfo, cancellationToken); [Function(FunctionName + nameof(Midnight))] public Task Midnight( - [TimerTrigger("0 0 0 * * *")] TimerInfo timerInfo, + [TimerTrigger(Constants.Schedules.MidnightCron)] TimerInfo timerInfo, CancellationToken cancellationToken ) => ProcessSchedule(timerInfo, cancellationToken); [Function(FunctionName + nameof(Noon))] public Task Noon( - [TimerTrigger("0 0 12 * * *")] TimerInfo timerInfo, + [TimerTrigger(Constants.Schedules.NoonCron)] TimerInfo timerInfo, CancellationToken cancellationToken ) => ProcessSchedule(timerInfo, cancellationToken); [Function(FunctionName + nameof(EveryFiveMinutes))] public Task EveryFiveMinutes( - [TimerTrigger("5 */5 * * * *")] TimerInfo timerInfo, + [TimerTrigger(Constants.Schedules.EveryFiveMinutesCron)] TimerInfo timerInfo, CancellationToken cancellationToken ) => ProcessSchedule(timerInfo, cancellationToken); [Function(FunctionName + nameof(EveryHour))] public Task EveryHour( - [TimerTrigger("10 0 * * * *")] TimerInfo timerInfo, + [TimerTrigger(Constants.Schedules.EveryHourCron)] TimerInfo timerInfo, CancellationToken cancellationToken ) => ProcessSchedule(timerInfo, cancellationToken); diff --git a/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs b/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs index e9d2f65..12b623d 100644 --- a/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs +++ b/src/SqlBulkSyncFunction/Functions/QueueGlobalChangeTracking.cs @@ -90,6 +90,17 @@ CancellationToken cancellationToken SyncJobs = [syncJob.ToLogSyncJob()] }; + var createdProgress = new SyncJobProgress( + Area: syncJob.Area, + ConfigurationId: syncJob.Id, + Schedule: syncJob.Schedule, + ScheduleCorrelationId: syncJob.ScheduleCorrelationId, + SyncJobCorrelationId: syncJob.CorrelationId, + State: SyncJobProgressState.Created + ); + + await syncProgressService.Report(createdProgress, cancellationToken); + await syncProgressService.Report( result, cancellationToken diff --git a/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs b/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs index da85d1d..9531629 100644 --- a/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs +++ b/src/SqlBulkSyncFunction/Helpers/SchemaExtensions.cs @@ -69,12 +69,12 @@ TableSchema tableSchema """ MERGE sync.TableVersion as target USING ( - SELECT @TableName AS TableName, - @CurrentVersion AS CurrentVersion, - @MinValidVersion AS MinValidVersion, - @Queried AS Queried, - SYSDATETIMEOFFSET() AS Updated, - SYSDATETIMEOFFSET() AS Created + SELECT @TableName AS TableName, + @CurrentVersion AS CurrentVersion, + @MinValidVersion AS MinValidVersion, + @Queried AS Queried, + SYSDATETIMEOFFSET() AS Updated, + SYSDATETIMEOFFSET() AS Created ) as source ON target.TableName = source.TableName WHEN NOT MATCHED BY target @@ -103,12 +103,13 @@ WHEN MATCHED THEN UPDATE OUTPUT inserted.TableName, inserted.CurrentVersion, inserted.MinValidVersion, + inserted.Updated, inserted.Queried; """ ) .SingleOrDefault(); - if (persistedTableVersion != syncedTableVersion) + if (persistedTableVersion != syncedTableVersion with { Updated = persistedTableVersion.Updated }) { throw new Exception($"Failed to persist {syncedTableVersion} ({persistedTableVersion})"); } @@ -130,7 +131,8 @@ string tableName SELECT TableName, CurrentVersion, MinValidVersion, - Queried + Queried, + Updated FROM sync.TableVersion WHERE TableName = @TableName """ diff --git a/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorResponse.cs b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorResponse.cs new file mode 100644 index 0000000..1f85773 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorResponse.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Text.Json.Serialization; + +namespace SqlBulkSyncFunction.Models.Job; + +/// +/// One progress step in the latest (or selected) run for monitoring. +/// +/// Progress state name. +/// When this state was recorded. +/// Optional details (e.g. exception). +public record SyncJobMonitorProgressStepDto( + [property: JsonPropertyName("state")] string State, + [property: JsonPropertyName("occured")] DateTimeOffset Occured, + [property: JsonPropertyName("message")] string Message +); + +/// +/// Combined HTTP response for monitor endpoints: job identity, aggregated schedule/progress from blob, and live table versions. +/// Use GET monitor/{area} for an array for every job in the area (one per enabled schedule per job), +/// GET monitor/{area}/{id} for an array (one per enabled schedule), or GET monitor/{area}/{id}/{schedule} for one schedule. +/// +/// Job area segment (same as the route). +/// Configured job identifier (same as the route). +/// Resolved schedule name for this payload (canonical key from config or Manual). +/// Human-readable schedule and timer summary from aggregated logs. +/// Next timer run from the last aggregated schedule status (UTC), or an estimate from the same NCRONTAB as the global change-tracking timer when status is missing and the job has not started execution yet; null if unknown. +/// Last timer execution from the last aggregated schedule status (UTC); null if unknown. +/// When the aggregate blob was last updated (if any). +/// Correlation id of the run with the newest progress activity. +/// Ordered steps for that run. +/// Per-table source and target change-tracking versions only (no row counts). +public record SyncJobMonitorResponse( + [property: JsonPropertyName("area")] string Area, + [property: JsonPropertyName("id")] string Id, + [property: JsonPropertyName("schedule")] string Schedule, + [property: JsonPropertyName("scheduleSummaryText")] string ScheduleSummaryText, + [property: JsonPropertyName("expectedRunAt")] DateTimeOffset? ExpectedRunAt, + [property: JsonPropertyName("lastRunAt")] DateTimeOffset? LastRunAt, + [property: JsonPropertyName("aggregatedAt")] DateTimeOffset? AggregatedAt, + [property: JsonPropertyName("latestRunCorrelationId")] string LatestRunCorrelationId, + [property: JsonPropertyName("latestProgressSteps")] IReadOnlyList LatestProgressSteps, + [property: JsonPropertyName("tables")] IReadOnlyList Tables +); diff --git a/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorTableVersionRow.cs b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorTableVersionRow.cs new file mode 100644 index 0000000..a0d176a --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Job/SyncJobMonitorTableVersionRow.cs @@ -0,0 +1,22 @@ +using System; +using System.Text.Json.Serialization; + +namespace SqlBulkSyncFunction.Models.Job; + +/// +/// Lightweight per-table row for GET monitor/{area}/{id}: current change-tracking versions only (no change counts). +/// +/// Configured table identifier from the sync job definition. +/// Qualified source table name. +/// Current change tracking version on the source (CHANGE_TRACKING_CURRENT_VERSION() scope), or -1 if unknown. +/// Last version stored in sync.TableVersion for the target table (or -1 if never synced). +/// Qualified target table name. +public record SyncJobMonitorTableVersionRow( + [property: JsonPropertyName("id")] string Id, + [property: JsonPropertyName("sourceTableName")] string SourceTableName, + [property: JsonPropertyName("sourceVersion")] long SourceVersion, + [property: JsonPropertyName("targetVersion")] long TargetVersion, + [property: JsonPropertyName("queried")] DateTimeOffset Queried, + [property: JsonPropertyName("updated")] DateTimeOffset? Updated, + [property: JsonPropertyName("targetTableName")] string TargetTableName +); diff --git a/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs b/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs index 00246b6..f2eb1ac 100644 --- a/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs +++ b/src/SqlBulkSyncFunction/Models/Job/SyncJobsConfig.cs @@ -6,8 +6,6 @@ namespace SqlBulkSyncFunction.Models.Job; public record SyncJobsConfig { - private static readonly string[] DefaultJobSchedules = ["Custom"]; - public Dictionary Jobs { get; init; } public Lazy Job)>> ScheduledJobs { get; } @@ -31,7 +29,7 @@ private static IEnumerable GetJobSchedules(KeyValuePair value.Value) .Select(key => key.Key); diff --git a/src/SqlBulkSyncFunction/Models/Monitor/SyncJobMonitorAggregate.cs b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobMonitorAggregate.cs new file mode 100644 index 0000000..f7b1844 --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobMonitorAggregate.cs @@ -0,0 +1,53 @@ +using System; +using System.Collections.Generic; + +namespace SqlBulkSyncFunction.Models.Monitor; + +/// +/// Persisted snapshot for a single sync job and schedule (keyed by area, job id, and schedule name), updated by the monitoring aggregation timer. +/// +/// Job area (matches ). +/// Job configuration id (matches dictionary key in ). +/// Schedule name (e.g. EveryFiveMinutes) that partitions this aggregate from other schedules for the same job. +public sealed record SyncJobMonitorAggregate( + string Area, + string JobId, + string Schedule +) +{ + /// UTC time when this aggregate was last written. + public DateTimeOffset AggregatedAt { get; set; } + + /// Schedule name from the last processed that listed this job. + public string ScheduleName { get; set; } + + /// Correlation id of that schedule log blob. + public string ScheduleCorrelationId { get; set; } + + /// Schedule log timestamp. + public DateTimeOffset? ScheduleTimestamp { get; set; } + + /// Schedule expiry from the log entry. + public DateTimeOffset? ScheduleExpires { get; set; } + + /// Last execution time from timer status when the schedule was logged. + public DateTimeOffset? ScheduleLast { get; set; } + + /// Next scheduled execution from timer status when the schedule was logged. + public DateTimeOffset? ScheduleNext { get; set; } + + /// Last-updated timestamp from timer status when the schedule was logged. + public DateTimeOffset? ScheduleLastUpdated { get; set; } + + /// Whether the timer was past due when the schedule was logged. + public bool IsSchedulePastDue { get; set; } + + /// Most recent progress seen for this job. + public DateTimeOffset? LastProgressOccured { get; set; } + + /// (run id) for the latest progress event. + public string LatestSyncJobCorrelationId { get; set; } + + /// Recent runs (newest activity first); size is capped by the aggregator. + public List Runs { get; set; } = []; +} diff --git a/src/SqlBulkSyncFunction/Models/Monitor/SyncJobProgressStepAggregate.cs b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobProgressStepAggregate.cs new file mode 100644 index 0000000..49ff2df --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobProgressStepAggregate.cs @@ -0,0 +1,13 @@ +using System; + +namespace SqlBulkSyncFunction.Models.Monitor; + +/// +/// One progress state occurrence within a run. +/// +/// When this state was reported. +/// Optional message (e.g. exception text). +public sealed record SyncJobProgressStepAggregate( + DateTimeOffset Occured, + string Message +); diff --git a/src/SqlBulkSyncFunction/Models/Monitor/SyncJobRunAggregate.cs b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobRunAggregate.cs new file mode 100644 index 0000000..fd106ed --- /dev/null +++ b/src/SqlBulkSyncFunction/Models/Monitor/SyncJobRunAggregate.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; + +namespace SqlBulkSyncFunction.Models.Monitor; + +/// +/// Progress steps for one execution (one correlation id without a trailing state segment). +/// +/// Base correlation id for the run (no /Created suffix). +public sealed record SyncJobRunAggregate(string SyncJobCorrelationId) +{ + /// Latest activity time across all steps in this run. + public DateTimeOffset LastActivity { get; set; } + + /// Last occurrence per progress state name (e.g. Created, Started, Done). + public Dictionary StepsByState { get; set; } = new(StringComparer.OrdinalIgnoreCase); +} diff --git a/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs b/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs index 43541c1..8262bd9 100644 --- a/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs +++ b/src/SqlBulkSyncFunction/Models/Schema/TableVersion.cs @@ -8,4 +8,5 @@ public record TableVersion public long CurrentVersion { get; set; } public long MinValidVersion { get; set; } public DateTimeOffset Queried { get; set; } + public DateTimeOffset? Updated { get; set; } }; diff --git a/src/SqlBulkSyncFunction/Program.cs b/src/SqlBulkSyncFunction/Program.cs index f2cd98f..023d338 100644 --- a/src/SqlBulkSyncFunction/Program.cs +++ b/src/SqlBulkSyncFunction/Program.cs @@ -31,6 +31,8 @@ .AddSingleton() .AddSingleton() .AddSingleton() + .AddSingleton() + .AddSingleton() .AddAzureClients( static az => { var connectionString = System.Environment.GetEnvironmentVariable("AzureWebJobsStorage"); diff --git a/src/SqlBulkSyncFunction/Services/ProcessGlobalChangeTrackingScheduleNextRun.cs b/src/SqlBulkSyncFunction/Services/ProcessGlobalChangeTrackingScheduleNextRun.cs new file mode 100644 index 0000000..9c1fe18 --- /dev/null +++ b/src/SqlBulkSyncFunction/Services/ProcessGlobalChangeTrackingScheduleNextRun.cs @@ -0,0 +1,115 @@ +using System; +using System.Collections.Concurrent; +using Cronos; +using Microsoft.Extensions.Configuration; +using SqlBulkSyncFunction.Functions; + +namespace SqlBulkSyncFunction.Services; + +/// +/// Estimates the next Azure Functions timer fire time for +/// using the same NCRONTAB expressions as the TimerTrigger attributes (including Custom from app settings). +/// Parsed cron expressions are cached per schedule name so repeated lookups avoid resolving cron and configuration. +/// +/// Application configuration (used for the Custom schedule expression). +public sealed class ProcessGlobalChangeTrackingScheduleNextRun(IConfiguration configuration) +{ + /// + /// Parsed instances keyed by normalized schedule name (trimmed; ). + /// If the Custom cron in configuration changes at runtime, restart the worker to pick up a new expression. + /// + private readonly ConcurrentDictionary _expressionsByScheduleName = new(StringComparer.Ordinal); + + /// + /// Resolves the next scheduled run strictly after when the expression is valid. + /// + /// Schedule key (e.g. EveryFiveMinutes, Custom). + /// Reference instant (typically UTC now). + /// Next occurrence in UTC, if resolved. + /// when a cron expression was resolved and the next occurrence exists. + public bool TryGetNextRunUtc(string scheduleName, DateTimeOffset fromUtc, out DateTimeOffset nextRunUtc) + { + nextRunUtc = default; + if (string.IsNullOrWhiteSpace(scheduleName)) + { + return false; + } + + if (!TryParseScheduleCron(scheduleName, out var expression)) + { + return false; + } + + var from = fromUtc.UtcDateTime; + var next = expression.GetNextOccurrence(from, TimeZoneInfo.Utc, inclusive: false); + if (!next.HasValue) + { + return false; + } + + nextRunUtc = new DateTimeOffset(DateTime.SpecifyKind(next.Value, DateTimeKind.Utc), TimeSpan.Zero); + return true; + } + + /// + /// Maps a schedule name to the same cron string as , parses it, and caches the result by schedule name. + /// + /// Schedule key from configuration (e.g. Midnight). + /// Parsed expression, or when unresolved or invalid. + /// when a valid expression was resolved. + public bool TryParseScheduleCron(string scheduleName, out CronExpression expression) + { + expression = null; + if (string.IsNullOrWhiteSpace(scheduleName)) + { + return false; + } + + var cacheKey = scheduleName.Trim(); + if (_expressionsByScheduleName.TryGetValue(cacheKey, out var cached)) + { + expression = cached; + return true; + } + + if (!TryResolveCronString(scheduleName, out var cron) || string.IsNullOrWhiteSpace(cron)) + { + return false; + } + + try + { + var parsed = CronExpression.Parse(cron, CronFormat.IncludeSeconds); + expression = _expressionsByScheduleName.GetOrAdd(cacheKey, parsed); + return true; + } + catch (CronFormatException) + { + return false; + } + } + + /// + /// Resolves the NCRONTAB string for (including Custom from configuration). + /// + /// Schedule key (method name on ). + /// Cron string when recognized; otherwise . + /// when a cron string was associated with the schedule name. + private bool TryResolveCronString(string scheduleName, out string cron) + { + cron = null; + var key = scheduleName.Trim(); + cron = key switch + { + _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.Midnight), StringComparison.Ordinal) => Constants.Schedules.MidnightCron, + _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.Noon), StringComparison.Ordinal) => Constants.Schedules.NoonCron, + _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.EveryFiveMinutes), StringComparison.Ordinal) => Constants.Schedules.EveryFiveMinutesCron, + _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.EveryHour), StringComparison.Ordinal) => Constants.Schedules.EveryHourCron, + _ when string.Equals(key, nameof(ProcessGlobalChangeTrackingSchedule.Custom), StringComparison.Ordinal) + => configuration[Constants.Schedules.CustomScheduleConfigurationKey]?.Trim(), + _ => null, + }; + + return !string.IsNullOrWhiteSpace(cron); + } +} diff --git a/src/SqlBulkSyncFunction/Services/SyncMonitoringAggregationService.cs b/src/SqlBulkSyncFunction/Services/SyncMonitoringAggregationService.cs new file mode 100644 index 0000000..b04af2b --- /dev/null +++ b/src/SqlBulkSyncFunction/Services/SyncMonitoringAggregationService.cs @@ -0,0 +1,373 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Azure; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using Azure.Storage.Queues; +using Microsoft.Extensions.Logging; +using SqlBulkSyncFunction.Models; +using SqlBulkSyncFunction.Models.Monitor; + +namespace SqlBulkSyncFunction.Services; + +/// +/// Drains schedule and sync-job progress queues in a fixed order, merges blobs into per-job aggregate blobs for cheap reads. +/// Aggregates are partitioned by schedule under {area}/{jobId}/{schedule}.json with optimistic concurrency (ETags). +/// +public sealed class SyncMonitoringAggregationService( + ILogger logger, + QueueServiceClient queueService, + BlobServiceClient blobServiceClient + ) +{ + private const int MaxRunsPerAggregate = 10; + private const int MaxMessagesPerReceive = 32; + private const int MaxConcurrencyRetries = 10; + + private static readonly JsonSerializerOptions JsonOptions = new() + { + PropertyNameCaseInsensitive = true, + WriteIndented = false, + }; + + private readonly QueueClient _logScheduleQueue = GetOrCreateQueue(queueService, Constants.Queues.LogScheduleQueue); + private readonly Dictionary _progressQueues = GetProgressQueues(queueService); + private readonly BlobContainerClient _syncJobContainer = GetOrCreateContainer(blobServiceClient, Constants.Containers.SyncJob); + private readonly BlobContainerClient _syncScheduleContainer = GetOrCreateContainer(blobServiceClient, Constants.Containers.SyncSchedule); + private readonly BlobContainerClient _monitorContainer = GetOrCreateContainer(blobServiceClient, Constants.Containers.Monitor); + + /// + /// Returns the persisted aggregate for , , and , or null if none. + /// + public async Task GetAggregateAsync(string area, string jobId, string schedule, CancellationToken cancellationToken) + { + var path = GetMonitorBlobPath(area, jobId, schedule); + var blob = _monitorContainer.GetBlobClient(path); + if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + return null; + } + + var download = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false); + return download.Value.Content.ToObjectFromJson(JsonOptions); + } + + /// + /// Processes all monitoring queues sequentially: log schedule first, then progress queues in enum order. + /// + public async Task ProcessAllQueuesAsync(CancellationToken cancellationToken) + { + await DrainQueueAsync(_logScheduleQueue, ProcessLogScheduleAsync, cancellationToken).ConfigureAwait(false); + + foreach (var state in Enum.GetValues()) + { + if (_progressQueues.TryGetValue(state, out var client)) + { + await DrainQueueAsync(client, ProcessSyncJobProgressAsync, cancellationToken).ConfigureAwait(false); + } + } + } + + private async Task DrainQueueAsync( + QueueClient queue, + Func handleCorrelationId, + CancellationToken cancellationToken + ) + { + while (!cancellationToken.IsCancellationRequested) + { + var response = await queue + .ReceiveMessagesAsync(maxMessages: MaxMessagesPerReceive, cancellationToken: cancellationToken) + .ConfigureAwait(false); + + if (response.Value.Length == 0) + { + return; + } + + foreach (var message in response.Value) + { + var correlationId = message.Body.ToString(); + if (string.IsNullOrWhiteSpace(correlationId)) + { + await queue.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken).ConfigureAwait(false); + continue; + } + + try + { + await handleCorrelationId(correlationId, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Monitoring aggregation failed for queue {Queue} message {CorrelationId}", queue.Name, correlationId); + } + + await queue.DeleteMessageAsync(message.MessageId, message.PopReceipt, cancellationToken).ConfigureAwait(false); + } + } + } + + private async Task ProcessLogScheduleAsync(string correlationId, CancellationToken cancellationToken) + { + var blob = _syncScheduleContainer.GetBlobClient($"{correlationId}.json"); + if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + logger.LogWarning("Schedule blob missing for {CorrelationId}", correlationId); + return; + } + + var download = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false); + var logSchedule = download.Value.Content.ToObjectFromJson(JsonOptions); + if (logSchedule == null || string.IsNullOrWhiteSpace(logSchedule.Name)) + { + return; + } + + foreach (var syncJob in logSchedule.SyncJobs ?? []) + { + if (string.IsNullOrWhiteSpace(syncJob.Area) || string.IsNullOrWhiteSpace(syncJob.Id)) + { + continue; + } + + await SaveAggregateWithConcurrencyRetryAsync( + syncJob.Area, + syncJob.Id, + logSchedule.Name, + aggregate => + { + aggregate ??= new SyncJobMonitorAggregate(syncJob.Area, syncJob.Id, logSchedule.Name); + aggregate.ScheduleName = logSchedule.Name; + aggregate.ScheduleCorrelationId = logSchedule.CorrelationId; + aggregate.ScheduleTimestamp = logSchedule.Timestamp; + aggregate.ScheduleExpires = logSchedule.Expires; + aggregate.ScheduleLast = logSchedule.Last; + aggregate.ScheduleNext = logSchedule.Next; + aggregate.ScheduleLastUpdated = logSchedule.LastUpdated; + aggregate.IsSchedulePastDue = logSchedule.IsPastDue; + aggregate.AggregatedAt = DateTimeOffset.UtcNow; + return aggregate; + }, + cancellationToken + ).ConfigureAwait(false); + } + } + + private async Task ProcessSyncJobProgressAsync(string correlationId, CancellationToken cancellationToken) + { + var blob = _syncJobContainer.GetBlobClient($"{correlationId}.json"); + if (!await blob.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + logger.LogWarning("Sync job progress blob missing for {CorrelationId}", correlationId); + return; + } + + var download = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false); + var progress = download.Value.Content.ToObjectFromJson(JsonOptions); + if (progress == null || + string.IsNullOrWhiteSpace(progress.Area) || + string.IsNullOrWhiteSpace(progress.ConfigurationId) || + string.IsNullOrWhiteSpace(progress.Schedule)) + { + return; + } + + var scheduleName = progress.Schedule; + + await SaveAggregateWithConcurrencyRetryAsync( + progress.Area, + progress.ConfigurationId, + scheduleName, + aggregate => + { + aggregate ??= new SyncJobMonitorAggregate(progress.Area, progress.ConfigurationId, scheduleName); + + var runKey = progress.SyncJobCorrelationId; + var run = aggregate.Runs.FirstOrDefault(r => string.Equals(r.SyncJobCorrelationId, runKey, StringComparison.Ordinal)); + if (run == null) + { + run = new SyncJobRunAggregate(runKey); + aggregate.Runs.Add(run); + } + + var stateName = progress.State.ToString("F"); + run.StepsByState[stateName] = new SyncJobProgressStepAggregate(progress.Occured, progress.Message); + run.LastActivity = run.StepsByState.Values.Max(s => s.Occured); + + PruneRuns(aggregate); + + if (!aggregate.LastProgressOccured.HasValue || progress.Occured >= aggregate.LastProgressOccured.Value) + { + aggregate.LastProgressOccured = progress.Occured; + aggregate.LatestSyncJobCorrelationId = runKey; + } + + aggregate.AggregatedAt = DateTimeOffset.UtcNow; + return aggregate; + }, + cancellationToken + ).ConfigureAwait(false); + } + + private static void PruneRuns(SyncJobMonitorAggregate aggregate) + { + if (aggregate.Runs.Count <= MaxRunsPerAggregate) + { + return; + } + + var ordered = aggregate.Runs + .OrderByDescending(r => r.LastActivity) + .Take(MaxRunsPerAggregate) + .ToList(); + aggregate.Runs.Clear(); + aggregate.Runs.AddRange(ordered); + } + + /// + /// Loads, applies , and saves with ETag preconditions; retries on write conflicts. + /// + private async Task SaveAggregateWithConcurrencyRetryAsync( + string area, + string jobId, + string scheduleName, + Func merge, + CancellationToken cancellationToken + ) + { + var path = GetMonitorBlobPath(area, jobId, scheduleName); + var blob = _monitorContainer.GetBlobClient(path); + + for (var attempt = 0; attempt < MaxConcurrencyRetries; attempt++) + { + ETag? etag = null; + SyncJobMonitorAggregate existing = null; + if (await blob.ExistsAsync(cancellationToken).ConfigureAwait(false)) + { + var dl = await blob.DownloadContentAsync(cancellationToken).ConfigureAwait(false); + etag = dl.Value.Details.ETag; + existing = dl.Value.Content.ToObjectFromJson(JsonOptions); + } + + var merged = merge(existing); + if (merged == null) + { + return; + } + + try + { + await UploadMonitorAggregateAsync(blob, merged, etag, cancellationToken).ConfigureAwait(false); + return; + } + catch (RequestFailedException ex) when (IsConcurrencyConflict(ex) && attempt < MaxConcurrencyRetries - 1) + { + logger.LogDebug( + ex, + "Monitor aggregate concurrency conflict for {Path}, retry {Attempt}", + path, + attempt + 1); + } + } + + throw new InvalidOperationException( + FormattableString.Invariant($"Could not save monitor aggregate after {MaxConcurrencyRetries} attempts: {path}")); + } + + private static bool IsConcurrencyConflict(RequestFailedException ex) + => ex.Status == 412 || ex.Status == 409; + + private static async Task UploadMonitorAggregateAsync( + BlobClient blob, + SyncJobMonitorAggregate aggregate, + ETag? etagFromDownload, + CancellationToken cancellationToken + ) + { + var data = BinaryData.FromObjectAsJson(aggregate, JsonOptions); + var options = new BlobUploadOptions + { + HttpHeaders = new BlobHttpHeaders + { + ContentType = Constants.BlobContentTypes.Json, + }, + }; + + if (etagFromDownload.HasValue) + { + options.Conditions = new BlobRequestConditions { IfMatch = etagFromDownload.Value }; + } + else + { + options.Conditions = new BlobRequestConditions { IfNoneMatch = ETag.All }; + } + + await blob.UploadAsync(data, options, cancellationToken).ConfigureAwait(false); + } + + /// + /// Virtual path under the monitor container: {area}/{jobId}/{schedule}.json (segments sanitized). + /// + internal static string GetMonitorBlobPath(string area, string jobId, string schedule) + => FormattableString.Invariant( + $"{SanitizeBlobPathSegment(area)}/{SanitizeBlobPathSegment(jobId)}/{SanitizeBlobPathSegment(schedule)}.json"); + + /// + /// Replaces characters that are unsafe or ambiguous in blob path segments. + /// + internal static string SanitizeBlobPathSegment(string segment) + { + if (string.IsNullOrWhiteSpace(segment)) + { + return "_"; + } + + var s = segment.Trim(); + var sb = new StringBuilder(s.Length); + foreach (var c in s) + { + if (c == '/' || c == '\\' || c == '#' || c == '?' || char.IsControl(c)) + { + _ = sb.Append('_'); + } + else + { + _ = sb.Append(c); + } + } + + return sb.Length > 0 ? sb.ToString() : "_"; + } + + private static QueueClient GetOrCreateQueue(QueueServiceClient queueService, string queueName) + { + var q = queueService.GetQueueClient(queueName); + _ = q.CreateIfNotExists(); + return q; + } + + private static BlobContainerClient GetOrCreateContainer(BlobServiceClient blobServiceClient, string name) + { + var c = blobServiceClient.GetBlobContainerClient(name); + _ = c.CreateIfNotExists(); + return c; + } + + private static Dictionary GetProgressQueues(QueueServiceClient queueService) + { + var map = new Dictionary(); + foreach (var state in Enum.GetValues()) + { + var queueName = FormattableString.Invariant($"{Constants.Queues.SyncJobProgressQueue}-{state.ToString("F").ToLowerInvariant()}"); + map[state] = GetOrCreateQueue(queueService, queueName); + } + + return map; + } +} diff --git a/src/SqlBulkSyncFunction/Services/SyncProgressService.cs b/src/SqlBulkSyncFunction/Services/SyncProgressService.cs index 61cd0b5..b2c6b3b 100644 --- a/src/SqlBulkSyncFunction/Services/SyncProgressService.cs +++ b/src/SqlBulkSyncFunction/Services/SyncProgressService.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Tasks; using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; using Azure.Storage.Queues; using SqlBulkSyncFunction.Models; @@ -62,9 +63,12 @@ public async Task Report(SyncJobProgress value, CancellationToken cancellationTo _ = await _syncJobBlobContainerClient .GetBlobClient($"{value.CorrelationId}.json") .UploadAsync( - content: BinaryData.FromObjectAsJson(value), - overwrite: true, - cancellationToken: cancellationToken + BinaryData.FromObjectAsJson(value), + new BlobUploadOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json }, + }, + cancellationToken ); if (_queueClients.TryGetValue(value.State, out var _queueClient)) @@ -80,9 +84,12 @@ public async Task Report(LogSchedule value, CancellationToken cancellationToken) _ = await _syncScheduleBlobContainerClient .GetBlobClient($"{value.CorrelationId}.json") .UploadAsync( - content: BinaryData.FromObjectAsJson(value), - overwrite: true, - cancellationToken: cancellationToken + BinaryData.FromObjectAsJson(value), + new BlobUploadOptions + { + HttpHeaders = new BlobHttpHeaders { ContentType = Constants.BlobContentTypes.Json }, + }, + cancellationToken ); _ = await _logScheduleQueueClient.SendMessageAsync(value.CorrelationId, cancellationToken); diff --git a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj index a780b5f..20a1160 100644 --- a/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj +++ b/src/SqlBulkSyncFunction/SqlBulkSyncFunction.csproj @@ -7,6 +7,7 @@ <_FunctionsSkipCleanOutput>true +