Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .server-changes/debounce-hot-key-lock-contention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
area: webapp
type: fix
---

Reduce 5xx feedback loops on hot debounce keys by quantizing `delayUntil`,
adding an unlocked fast-path skip, and gracefully handling redlock
contention in `handleDebounce` so the SDK no longer retries into a herd.
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ const EnvironmentSchema = z
.default("info"),
RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"),
RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"),
RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"),

/** How long should the presence ttl last */
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ function createRunEngine() {
// Debounce configuration
debounce: {
maxDebounceDurationMs: env.RUN_ENGINE_MAXIMUM_DEBOUNCE_DURATION_MS,
useReplicaForFastPathRead: env.RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ === "1",
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
},
});

Expand Down
3 changes: 3 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ export class RunEngine {
executionSnapshotSystem: this.executionSnapshotSystem,
delayedRunSystem: this.delayedRunSystem,
maxDebounceDurationMs: options.debounce?.maxDebounceDurationMs ?? 60 * 60 * 1000, // Default 1 hour
quantizeNewDelayUntilMs: options.debounce?.quantizeNewDelayUntilMs ?? 1000,
fastPathSkipEnabled: options.debounce?.fastPathSkipEnabled ?? true,
useReplicaForFastPathRead: options.debounce?.useReplicaForFastPathRead ?? false,
});

this.pendingVersionSystem = new PendingVersionSystem({
Expand Down
300 changes: 294 additions & 6 deletions internal-packages/run-engine/src/engine/systems/debounceSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ import {
parseNaturalLanguageDuration,
parseNaturalLanguageDurationInMs,
} from "@trigger.dev/core/v3/isomorphic";
import { PrismaClientOrTransaction, TaskRun, Waitpoint } from "@trigger.dev/database";
import {
PrismaClientOrTransaction,
PrismaReplicaClient,
TaskRun,
Waitpoint,
} from "@trigger.dev/database";
import { nanoid } from "nanoid";
import { SystemResources } from "./systems.js";
import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
import { DelayedRunSystem } from "./delayedRunSystem.js";
import { LockAcquisitionTimeoutError } from "../locking.js";

export type DebounceOptions = {
key: string;
Expand Down Expand Up @@ -45,6 +51,22 @@ export type DebounceSystemOptions = {
executionSnapshotSystem: ExecutionSnapshotSystem;
delayedRunSystem: DelayedRunSystem;
maxDebounceDurationMs: number;
/**
* Bucket size in milliseconds used to quantize the newly computed `delayUntil`.
* Set to 0 to disable quantization.
*/
quantizeNewDelayUntilMs?: number;
/**
* When true, read the existing run's `delayUntil` outside the redlock and
* short-circuit if the new (quantized) `delayUntil` is not later than the
* current one.
*/
fastPathSkipEnabled?: boolean;
/**
* When true, route the unlocked fast-path reads (probe + full-run fetch)
* through `readOnlyPrisma` (e.g. an Aurora reader) instead of the writer.
*/
useReplicaForFastPathRead?: boolean;
};

export type DebounceResult =
Expand Down Expand Up @@ -89,6 +111,9 @@ export class DebounceSystem {
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
private readonly delayedRunSystem: DelayedRunSystem;
private readonly maxDebounceDurationMs: number;
private readonly quantizeNewDelayUntilMs: number;
private readonly fastPathSkipEnabled: boolean;
private readonly useReplicaForFastPathRead: boolean;

constructor(options: DebounceSystemOptions) {
this.$ = options.resources;
Expand All @@ -106,6 +131,9 @@ export class DebounceSystem {
this.executionSnapshotSystem = options.executionSnapshotSystem;
this.delayedRunSystem = options.delayedRunSystem;
this.maxDebounceDurationMs = options.maxDebounceDurationMs;
this.quantizeNewDelayUntilMs = Math.max(0, options.quantizeNewDelayUntilMs ?? 1000);
this.fastPathSkipEnabled = options.fastPathSkipEnabled ?? true;
this.useReplicaForFastPathRead = options.useReplicaForFastPathRead ?? false;

this.#registerCommands();
}
Expand Down Expand Up @@ -450,9 +478,271 @@ return 0
debounce: DebounceOptions;
tx?: PrismaClientOrTransaction;
}): Promise<DebounceResult> {
return await this.$.runLock.lock("handleDebounce", [existingRunId], async () => {
const prisma = tx ?? this.$.prisma;
const prisma = tx ?? this.$.prisma;
// Reads in the unlocked fast-path can run on `readOnlyPrisma` when
// configured (e.g. an Aurora reader). Replica lag is fine: debounce is
// best-effort and a stale read either falls through to the locked path
// (when delayUntil hasn't replicated yet) or returns the existing run
// (when the run's status is stale). The latter is the same outcome the
// caller would see if their trigger had simply landed a few hundred ms
// earlier, which is within the natural debounce race. Only divert reads
// when the caller isn't inside a tx (where the read needs to see the
// tx's writes).
const fastPathReadPrisma =
tx ?? (this.useReplicaForFastPathRead ? this.$.readOnlyPrisma : this.$.prisma);
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Compute the (quantized) target delayUntil up-front, before taking any lock.
// Quantizing to e.g. 1s buckets collapses many concurrent triggers on the same
// hot debounce key onto the same target time, so the unlocked fast-path skip
// below becomes effective and the redlock is not contended.
const newDelayUntil = this.#computeQuantizedDelayUntil(debounce.delay);

// Fast-path: read the current delayUntil outside the redlock and short-circuit
// if our (quantized) newDelayUntil isn't later than what's already scheduled.
// Safe because debounce is monotonic-forward only: a stale read either matches
// reality or undershoots, both of which decay correctly (re-checked properly
// inside the lock by whoever is actually pushing forward).
if (this.fastPathSkipEnabled && newDelayUntil) {
const fastPathResult = await this.#tryFastPathSkip({
existingRunId,
newDelayUntil,
debounce,
prisma: fastPathReadPrisma,
});
if (fastPathResult) {
return fastPathResult;
}
}

try {
return await this.$.runLock.lock("handleDebounce", [existingRunId], async () => {
return await this.#handleExistingRunLocked({
existingRunId,
redisKey,
environmentId,
taskIdentifier,
debounce,
newDelayUntil,
prisma,
tx,
});
});
} catch (error) {
// Lock contention safety net: if we couldn't take the lock (redlock quorum
// failure or our retry budget exhausted), fall in line with whoever is
// actually updating the run instead of bubbling a 5xx to the SDK and
// amplifying the herd via SDK retries. Debounce is best-effort - dropping
// our contribution to delayUntil here is fine, the herd is updating it for
// us.
if (this.#isLockContentionError(error)) {
return await this.#handleLockContentionFallback({
existingRunId,
debounce,
error,
prisma,
});
}
throw error;
}
}
Comment thread
ericallam marked this conversation as resolved.

/**
* Parses the debounce delay and (optionally) quantizes it to a bucket boundary
* by flooring the absolute timestamp. Quantization makes concurrent triggers on
* the same key share a target time, which is what makes the unlocked fast-path
* skip effective.
*/
#computeQuantizedDelayUntil(delay: string): Date | null {
const parsed = parseNaturalLanguageDuration(delay);
if (!parsed) {
return null;
}
if (this.quantizeNewDelayUntilMs <= 0) {
return parsed;
}
const bucket = this.quantizeNewDelayUntilMs;
const quantized = Math.floor(parsed.getTime() / bucket) * bucket;
return new Date(quantized);
Comment thread
ericallam marked this conversation as resolved.
}

#isLockContentionError(error: unknown): boolean {
if (!(error instanceof Error)) return false;
return (
error instanceof LockAcquisitionTimeoutError ||
error.name === "LockAcquisitionTimeoutError" ||
error.name === "ExecutionError" ||
error.name === "ResourceLockedError"
);
}

/**
* Reads `delayUntil`/`status`/`createdAt` outside the redlock and
* short-circuits if the existing scheduled time already covers our target.
* Skips trailing-mode triggers that carry `updateData` since those still need
* the lock to apply their data update. Also falls through when the run has
* already exceeded its max debounce duration so the locked path can return
* `max_duration_exceeded` and let the caller create a new run.
*
* `prisma` may be a read replica - replica lag is acceptable because
* debounce is best-effort. A stale `delayUntil` either matches reality or
* undershoots (we fall through to the locked path); a stale `status` at
* worst returns the existing run, which is the same outcome the caller
* would see if their trigger had landed a few hundred ms earlier.
*/
async #tryFastPathSkip({
existingRunId,
newDelayUntil,
debounce,
prisma,
}: {
existingRunId: string;
newDelayUntil: Date;
debounce: DebounceOptions;
prisma: PrismaClientOrTransaction | PrismaReplicaClient;
}): Promise<DebounceResult | null> {
// Trailing mode with updateData still needs the lock so the data update is
// applied; only short-circuit when there's nothing to update.
if (debounce.mode === "trailing" && debounce.updateData) {
return null;
}

const probe = await prisma.taskRun.findFirst({
where: { id: existingRunId },
select: { status: true, delayUntil: true, createdAt: true },
});
if (!probe || probe.status !== "DELAYED" || !probe.delayUntil) {
return null;
}
if (newDelayUntil.getTime() > probe.delayUntil.getTime()) {
return null;
}

// Fall through to the lock path when newDelayUntil would exceed the run's
// max debounce window so the caller can return max_duration_exceeded and
// create a fresh run.
let maxDurationMs = this.maxDebounceDurationMs;
if (debounce.maxDelay) {
const parsedMaxDelay = parseNaturalLanguageDurationInMs(debounce.maxDelay);
if (parsedMaxDelay !== undefined) {
maxDurationMs = parsedMaxDelay;
}
}
const maxDelayUntilMs = probe.createdAt.getTime() + maxDurationMs;
if (newDelayUntil.getTime() > maxDelayUntilMs) {
return null;
}

const fullRun = await prisma.taskRun.findFirst({
where: { id: existingRunId },
include: { associatedWaitpoint: true },
});
if (!fullRun || fullRun.status !== "DELAYED") {
return null;
}

this.$.logger.debug("handleExistingRun: fast-path skip, existing delayUntil already covers", {
existingRunId,
debounceKey: debounce.key,
newDelayUntil,
currentDelayUntil: fullRun.delayUntil,
});

return {
status: "existing",
run: fullRun,
waitpoint: fullRun.associatedWaitpoint,
};
}

async #handleLockContentionFallback({
existingRunId,
debounce,
error,
prisma,
}: {
existingRunId: string;
debounce: DebounceOptions;
error: unknown;
prisma: PrismaClientOrTransaction;
}): Promise<DebounceResult> {
const fullRun = await prisma.taskRun.findFirst({
where: { id: existingRunId },
include: { associatedWaitpoint: true },
});

if (!fullRun || fullRun.status !== "DELAYED") {
// The run is no longer in a state we can safely return as "existing" -
// re-throw so the caller surfaces the failure rather than silently
// succeeding on a stale/terminated run.
this.$.logger.warn(
"handleExistingRun: lock contention, but existing run no longer DELAYED - rethrowing",
{
existingRunId,
debounceKey: debounce.key,
status: fullRun?.status,
}
);
throw error;
}

if (debounce.mode === "trailing" && debounce.updateData) {
// Trailing-mode triggers carrying updateData are user-visible: dropping
// them silently would mean the eventual run executes against stale
// payload/metadata/tags. Surface the lock failure instead so the SDK can
// retry and (with the fast-path + quantization in place) the herd
// collapses on its own without us hiding data loss.
this.$.logger.warn(
"handleExistingRun: lock contention with trailing updateData - rethrowing to avoid silently dropping update",
{
existingRunId,
debounceKey: debounce.key,
}
);
throw error;
Comment thread
ericallam marked this conversation as resolved.
Outdated
}

this.$.logger.warn(
"handleExistingRun: lock contention, returning existing run without rescheduling",
{
existingRunId,
debounceKey: debounce.key,
currentDelayUntil: fullRun.delayUntil,
error: error instanceof Error ? error.message : String(error),
errorName: error instanceof Error ? error.name : undefined,
}
);

return {
status: "existing",
run: fullRun,
waitpoint: fullRun.associatedWaitpoint,
};
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/**
* Body of `handleExistingRun` that runs while holding the redlock on the run.
* Receives the (possibly quantized) `newDelayUntil` precomputed by the caller.
*/
async #handleExistingRunLocked({
existingRunId,
redisKey,
environmentId,
taskIdentifier,
debounce,
newDelayUntil,
prisma,
tx,
}: {
existingRunId: string;
redisKey: string;
environmentId: string;
taskIdentifier: string;
debounce: DebounceOptions;
newDelayUntil: Date | null;
prisma: PrismaClientOrTransaction;
tx?: PrismaClientOrTransaction;
}): Promise<DebounceResult> {
{
// Get the latest execution snapshot
let snapshot;
try {
Expand Down Expand Up @@ -514,8 +804,6 @@ return 0
});
}

// Calculate new delay - parseNaturalLanguageDuration returns a Date (now + duration)
const newDelayUntil = parseNaturalLanguageDuration(debounce.delay);
if (!newDelayUntil) {
this.$.logger.error("handleExistingRun: invalid delay duration", {
delay: debounce.delay,
Expand Down Expand Up @@ -619,7 +907,7 @@ return 0
run: updatedRun,
waitpoint: existingRun.associatedWaitpoint,
};
});
}
}

/**
Expand Down
Loading
Loading