From 2a3963211eea81d66a0c88c2bf2ceab660d1b5e7 Mon Sep 17 00:00:00 2001 From: Dennis Snell Date: Fri, 1 May 2026 21:38:32 -0700 Subject: [PATCH] RTC: Add base version tracking to prevent stale CRDT doc overwrites Clients now embed a baseVersion in serialised _crdt_document meta values. The server rejects writes whose base version does not match the currently stored version, using an atomic compare-and-swap on the database row. On rejection the client refetches, merges the server's CRDT doc into the local Y.Doc, and retries. Autosave payloads no longer include _crdt_document to avoid sending stale copies. --- lib/compat/wordpress-7.0/collaboration.php | 118 +++++++++++++++++++++ packages/core-data/src/actions.js | 16 +++ packages/core-data/src/entities.js | 15 ++- packages/core-data/src/resolvers.js | 85 ++++++++++++--- packages/sync/src/manager.ts | 7 +- packages/sync/src/types.ts | 3 +- packages/sync/src/utils.ts | 6 +- 7 files changed, 228 insertions(+), 22 deletions(-) diff --git a/lib/compat/wordpress-7.0/collaboration.php b/lib/compat/wordpress-7.0/collaboration.php index b4804397bb75c5..783f9bd9a48b65 100644 --- a/lib/compat/wordpress-7.0/collaboration.php +++ b/lib/compat/wordpress-7.0/collaboration.php @@ -98,6 +98,124 @@ function gutenberg_rest_api_crdt_post_meta() { add_action( 'init', 'gutenberg_rest_api_crdt_post_meta' ); } +if ( ! function_exists( 'gutenberg_crdt_intercept_post_meta_update' ) ) { + /** + * Intercepts post meta updates for the persisted CRDT document to + * implement optimistic concurrency control. Clients embed a `baseVersion` + * field in the serialized document. Before writing, this filter checks + * that the client's base version matches the version currently stored on + * the server. When versions match the write proceeds and the version is + * incremented atomically using a compare-and-swap on the database row. + * + * When the stored version does not match the client's base version the + * write is rejected — another client has already updated the document. + * The rejected client will retry after receiving the latest version + * through the next server response. + * + * @param mixed $check Whether to allow the update. Returning a + * non-null value short-circuits update_metadata(). + * @param int $object_id Post ID. + * @param string $meta_key Meta key being updated. + * @param mixed $meta_value New meta value (JSON string). + * @return mixed Null to allow WordPress to proceed, false to reject, true + * when the write was handled directly. + */ + function gutenberg_crdt_intercept_post_meta_update( $check, $object_id, $meta_key, $meta_value ) { + if ( '_crdt_document' !== $meta_key ) { + return $check; + } + + $incoming = json_decode( $meta_value, true ); + if ( ! is_array( $incoming ) ) { + return $check; + } + + $base_version = (int) ( $incoming['baseVersion'] ?? 0 ); + + global $wpdb; + + $row = $wpdb->get_row( + $wpdb->prepare( + "SELECT meta_id, meta_value FROM $wpdb->postmeta + WHERE post_id = %d AND meta_key = %s", + $object_id, + '_crdt_document' + ) + ); + + if ( $row ) { + // Existing meta — check version before writing. + $current = json_decode( $row->meta_value, true ); + $current_version = (int) ( is_array( $current ) ? ( $current['baseVersion'] ?? 0 ) : 0 ); + + if ( $current_version !== $base_version ) { + return false; // Stale — client's base version does not match the server. + } + + $incoming['baseVersion'] = $current_version + 1; + $new_value = wp_json_encode( $incoming ); + + // Atomic compare-and-swap: only update if the row hasn't changed + // since we last read it. + $affected = $wpdb->update( + $wpdb->postmeta, + array( 'meta_value' => $new_value ), + array( + 'meta_id' => $row->meta_id, + 'meta_value' => $row->meta_value, + ) + ); + + if ( 0 === $affected ) { + return false; // Lost the race — another request updated first. + } + + wp_cache_delete( $object_id, 'post_meta' ); + + /** + * Fires immediately after a CRDT-document post meta row is updated. + * Mirrors WordPress Core's `updated_post_meta` action to keep + * caches and hooks consistent. + * + * @param int $meta_id Meta ID. + * @param int $object_id Post ID. + * @param string $meta_key Meta key. + * @param mixed $meta_value Meta value. + */ + do_action( 'updated_post_meta', $row->meta_id, $object_id, $meta_key, $meta_value ); + + return true; // Handled — short-circuit WordPress. + } + + // First-time write: no stored row yet. Let WordPress do the INSERT. + // The `pre_update_post__crdt_document` filter will inject + // baseVersion=1 before the row is created. + return $check; + } + add_filter( 'update_post_metadata', 'gutenberg_crdt_intercept_post_meta_update', 10, 4 ); +} + +if ( ! function_exists( 'gutenberg_crdt_set_initial_base_version' ) ) { + /** + * Sets the initial `baseVersion` on the first write of a persisted CRDT + * document. WordPress's `update_post_metadata` filter cannot modify the + * value for INSERTs (only short-circuit them), so this companion filter + * injects `baseVersion=1` when the stored row doesn't exist yet. + * + * @param mixed $meta_value New meta value (JSON string). + * @return mixed Modified meta value with baseVersion set, or the original. + */ + function gutenberg_crdt_set_initial_base_version( $meta_value ) { + $decoded = json_decode( $meta_value, true ); + if ( is_array( $decoded ) && empty( $decoded['baseVersion'] ) ) { + $decoded['baseVersion'] = 1; + return wp_json_encode( $decoded ); + } + return $meta_value; + } + add_filter( 'pre_update_post__crdt_document', 'gutenberg_crdt_set_initial_base_version', 10, 1 ); +} + if ( ! function_exists( 'wp_collaboration_inject_setting' ) ) { /** * Registers the real-time collaboration setting. diff --git a/packages/core-data/src/actions.js b/packages/core-data/src/actions.js index 47e12f46e7bd92..dcd24379068306 100644 --- a/packages/core-data/src/actions.js +++ b/packages/core-data/src/actions.js @@ -688,6 +688,22 @@ export const saveEntityRecord = // is intentionally excluded to avoid stale values // overriding reverted fields. const merged = { ...persistedRecord, ...record }; + + // The persisted CRDT document is managed through explicit + // saves (which call __unstablePrePersist to serialize a + // fresh copy). Autosaves carry a stale copy from the last + // server response and will either be rejected by the + // server's version check or, worse, overwrite a newer + // document. Strip it so autosaves don't touch sync data. + if ( merged.meta ) { + const { + /* eslint-disable-next-line camelcase */ + _crdt_document, + ...metaWithoutCRDT + } = merged.meta; + merged.meta = metaWithoutCRDT; + } + const data = [ 'title', 'excerpt', diff --git a/packages/core-data/src/entities.js b/packages/core-data/src/entities.js index 83e1ccd1c79e5f..b1ed045bb8e18b 100644 --- a/packages/core-data/src/entities.js +++ b/packages/core-data/src/entities.js @@ -310,9 +310,22 @@ export const prePersistPostType = async ( if ( persistedRecord ) { const objectType = `postType/${ name }`; const objectId = persistedRecord.id; + + let baseVersion = 0; + try { + const persistedCrdtDoc = + persistedRecord.meta?.[ + POST_META_KEY_FOR_CRDT_DOC_PERSISTENCE + ]; + if ( persistedCrdtDoc ) { + const parsed = JSON.parse( persistedCrdtDoc ); + baseVersion = parsed.baseVersion ?? 0; + } + } catch {} const serializedDoc = await getSyncManager()?.createPersistedCRDTDoc( objectType, - objectId + objectId, + baseVersion ); if ( serializedDoc ) { diff --git a/packages/core-data/src/resolvers.js b/packages/core-data/src/resolvers.js index dfef02c0968e12..850f6f25a30b29 100644 --- a/packages/core-data/src/resolvers.js +++ b/packages/core-data/src/resolvers.js @@ -236,26 +236,77 @@ export const getEntityRecord = // This effectively means that only post entities support CRDT // persistence. As we add support for syncing additional entity, // we'll need to revisit where persisted CRDT documents are stored. - persistCRDTDoc: () => { - resolveSelect - .getEditedEntityRecord( kind, name, key ) - .then( ( editedRecord ) => { - // Don't persist the CRDT document if the record is still an - // auto-draft or if the entity does not support meta. - const { meta, status } = editedRecord; - if ( 'auto-draft' === status || ! meta ) { - return; - } - - // Trigger a save to persist the CRDT document. The entity's - // pre-persist hooks will create the persisted CRDT document - // and apply it to the record's meta. - dispatch.saveEntityRecord( + persistCRDTDoc: async () => { + const editedRecord = + await resolveSelect.getEditedEntityRecord( + kind, + name, + key + ); + + const { meta, status } = editedRecord; + if ( 'auto-draft' === status || ! meta ) { + return; + } + + let baseVersionSent = 0; + try { + const persistedDoc = meta._crdt_document; + if ( persistedDoc ) { + const parsed = JSON.parse( persistedDoc ); + baseVersionSent = parsed.baseVersion ?? 0; + } + } catch {} + + const savedRecord = await dispatch.saveEntityRecord( + kind, + name, + editedRecord + ); + if ( ! savedRecord?.meta?._crdt_document ) { + return; + } + + let serverVersion = 0; + try { + const savedDoc = + savedRecord.meta._crdt_document; + const parsed = JSON.parse( savedDoc ); + serverVersion = parsed.baseVersion ?? 0; + } catch {} + + if ( serverVersion === baseVersionSent ) { + return; + } + + // Another client's CRDT doc was persisted between + // our read and write. Merge the server's CRDT doc + // into our local Y.Doc so we don't overwrite their + // changes on retry. + try { + const syncManager = getSyncManager(); + if ( syncManager ) { + await resolveSelect.getEntityRecord( + kind, + name, + key + ); + // getEntityRecord re-applies the persisted CRDT + // doc to the local Y.Doc via load(). The + // edited record now reflects merged state. + const mergedRecord = + await resolveSelect.getEditedEntityRecord( + kind, + name, + key + ); + await dispatch.saveEntityRecord( kind, name, - editedRecord + mergedRecord ); - } ); + } + } catch {} }, addUndoMeta: ( ydoc, meta ) => { const selectionHistory = diff --git a/packages/sync/src/manager.ts b/packages/sync/src/manager.ts index dcca0c5d9970e6..e36984689a06d2 100644 --- a/packages/sync/src/manager.ts +++ b/packages/sync/src/manager.ts @@ -637,15 +637,18 @@ export function createSyncManager( debug = false ): SyncManager { handlers.editRecord( changes ); } + /* eslint-disable-next-line jsdoc/check-line-alignment */ /** * Create object meta to persist the CRDT document in the entity record. * * @param {ObjectType} objectType Object type. * @param {ObjectID} objectId Object ID. + * @param {number} [baseVersion=0] Base version from the server. */ async function createPersistedCRDTDoc( objectType: ObjectType, - objectId: ObjectID + objectId: ObjectID, + baseVersion: number = 0 ): Promise< string | null > { const entityId = getEntityId( objectType, objectId ); const entityState = entityStates.get( entityId ); @@ -659,7 +662,7 @@ export function createSyncManager( debug = false ): SyncManager { // before we serialize the document. await new Promise( ( resolve ) => setTimeout( resolve, 0 ) ); - return serializeCrdtDoc( entityState.ydoc ); + return serializeCrdtDoc( entityState.ydoc, baseVersion ); } // Collect internal functions so that they can be wrapped before calling. diff --git a/packages/sync/src/types.ts b/packages/sync/src/types.ts index e9a3e57bfeae09..69a63a37abfce4 100644 --- a/packages/sync/src/types.ts +++ b/packages/sync/src/types.ts @@ -159,7 +159,8 @@ export interface SyncConfig { export interface SyncManager { createPersistedCRDTDoc: ( objectType: ObjectType, - objectId: ObjectID + objectId: ObjectID, + baseVersion?: number ) => Promise< string | null >; getAwareness: < State extends Awareness >( objectType: ObjectType, diff --git a/packages/sync/src/utils.ts b/packages/sync/src/utils.ts index a420f811205cf7..b4fb40714344cc 100644 --- a/packages/sync/src/utils.ts +++ b/packages/sync/src/utils.ts @@ -67,9 +67,13 @@ function pseudoRandomID(): number { return Math.floor( Math.random() * 1000000000 ); } -export function serializeCrdtDoc( crdtDoc: CRDTDoc ): string { +export function serializeCrdtDoc( + crdtDoc: CRDTDoc, + baseVersion: number = 0 +): string { return JSON.stringify( { document: buffer.toBase64( Y.encodeStateAsUpdateV2( crdtDoc ) ), + baseVersion, updateId: pseudoRandomID(), // helps with debugging } ); }