Skip to content

Commit 79d1b6b

Browse files
fix(plugin-search): serialize reindexing to prevent locale data race conditions (#15917)
## Description This PR fixes a race condition in the plugin-search's reindexing functionality that caused missing locale data when reindexing multiple collections simultaneously. ### Changes - Replaced `Promise.all` with sequential processing of collections in `generateReindexHandler.ts` - This change ensures proper handling of locale data during reindexing operations ### Motivation The previous implementation used concurrent execution for collection reindexing, which led to race conditions and incomplete locale data being persisted. The issue was causing non-deterministic failures in localized search indexing. Fixes #15884 --------- Co-authored-by: Paul Popus <paul@payloadcms.com>
1 parent 976369d commit 79d1b6b

2 files changed

Lines changed: 116 additions & 17 deletions

File tree

packages/plugin-search/src/utilities/generateReindexHandler.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,6 @@ export const generateReindexHandler =
8989
equals: 'published',
9090
},
9191
}
92-
let aggregateDocsWithDrafts = 0
93-
let aggregateErrors = 0
94-
let aggregateDocs = 0
95-
9692
async function countDocuments(collection: string, drafts?: boolean): Promise<number> {
9793
const { totalDocs } = await payload.count({
9894
collection,
@@ -113,7 +109,9 @@ export const generateReindexHandler =
113109
})
114110
}
115111

116-
async function reindexCollection(collection: string) {
112+
async function reindexCollection(
113+
collection: string,
114+
): Promise<{ docs: number; docsWithDrafts: number; errors: number }> {
117115
const draftsEnabled = Boolean(payload.collections[collection]?.config.versions?.drafts)
118116

119117
const totalDocsWithDrafts = await countDocuments(collection, true)
@@ -123,8 +121,7 @@ export const generateReindexHandler =
123121
: await countDocuments(collection, !draftsEnabled)
124122
const totalBatches = Math.ceil(totalDocs / batchSize)
125123

126-
aggregateDocsWithDrafts += totalDocsWithDrafts
127-
aggregateDocs += totalDocs
124+
let localErrors = 0
128125

129126
// Loop through batches, then documents, then locales per document
130127
for (let i = 0; i < totalBatches; i++) {
@@ -183,37 +180,49 @@ export const generateReindexHandler =
183180
data: doc,
184181
doc,
185182
locale: localeToSync,
186-
onSyncError: () => operation === 'create' && aggregateErrors++,
183+
onSyncError: () => operation === 'create' && localErrors++,
187184
operation,
188185
pluginConfig,
189186
req,
190187
})
191188
}
192189
}
193190
}
191+
192+
return { docs: totalDocs, docsWithDrafts: totalDocsWithDrafts, errors: localErrors }
194193
}
195194

196195
const shouldCommit = await initTransaction(req)
197196

197+
// Collections are processed sequentially to avoid race conditions within the shared transaction.
198+
// Concurrent writes to the search collection interleave on the same DB connection and can cause
199+
// locale data to be missing non-deterministically.
200+
const results: Array<{ docs: number; docsWithDrafts: number; errors: number }> = []
198201
try {
199-
const promises = collections.map(async (collection) => {
202+
for (const collection of collections) {
200203
try {
201204
await deleteIndexes(collection)
202-
await reindexCollection(collection)
205+
results.push(await reindexCollection(collection))
203206
} catch (err) {
204207
const message = t('error:unableToReindexCollection', { collection })
205208
payload.logger.error({ err, msg: message })
209+
results.push({ docs: 0, docsWithDrafts: 0, errors: 0 })
206210
}
207-
})
208-
209-
await Promise.all(promises)
210-
} catch (err: any) {
211+
}
212+
} catch (err: unknown) {
211213
if (shouldCommit) {
212214
await killTransaction(req)
213215
}
214-
return Response.json({ message: err.message }, { headers, status: 500 })
216+
return Response.json(
217+
{ message: err instanceof Error ? err.message : String(err) },
218+
{ headers, status: 500 },
219+
)
215220
}
216221

222+
const aggregateDocsWithDrafts = results.reduce((sum, r) => sum + r.docsWithDrafts, 0)
223+
const aggregateDocs = results.reduce((sum, r) => sum + r.docs, 0)
224+
const aggregateErrors = results.reduce((sum, r) => sum + r.errors, 0)
225+
217226
const message = t('general:successfullyReindexed', {
218227
collections: collections.join(', '),
219228
count: aggregateDocs - aggregateErrors,

test/plugin-search/int.spec.ts

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { describe, beforeAll, beforeEach, afterAll, it, expect } from 'vitest'
21
import type { Payload } from 'payload'
32

43
import path from 'path'
54
import { wait } from 'payload/shared'
65
import { fileURLToPath } from 'url'
6+
import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest'
77

88
import type { NextRESTClient } from '../__helpers/shared/NextRESTClient.js'
99

10-
import { devUser } from '../credentials.js'
1110
import { initPayloadInt } from '../__helpers/shared/initPayloadInt.js'
11+
import { devUser } from '../credentials.js'
1212
import { pagesSlug, postsSlug } from './shared.js'
1313

1414
let payload: Payload
@@ -538,6 +538,96 @@ describe('@payloadcms/plugin-search', () => {
538538
expect(totalAfterReindex).toBe(totalBeforeReindex)
539539
})
540540

541+
it('should report correct aggregate counts when reindexing multiple collections', async () => {
542+
await Promise.all([
543+
payload.create({
544+
collection: postsSlug,
545+
data: { title: 'Post one', _status: 'published' },
546+
}),
547+
payload.create({
548+
collection: postsSlug,
549+
data: { title: 'Post two', _status: 'published' },
550+
}),
551+
payload.create({
552+
collection: pagesSlug,
553+
data: { title: 'Page one', _status: 'published' },
554+
}),
555+
])
556+
557+
const endpointRes = await restClient.POST(`/search/reindex`, {
558+
body: JSON.stringify({ collections: [postsSlug, pagesSlug] }),
559+
headers: { Authorization: `JWT ${token}` },
560+
})
561+
562+
expect(endpointRes.status).toBe(200)
563+
564+
const data = await endpointRes.json()
565+
566+
// 2 posts + 1 page = 3 total, all published, 0 drafts skipped, 0 errors
567+
expect((data as { message: string }).message).toBe(
568+
`Successfully reindexed 3 of 3 documents from ${postsSlug}, ${pagesSlug} and skipped 0 drafts.`,
569+
)
570+
})
571+
572+
it('should index locale-specific data for all locales when reindexing multiple collections', async () => {
573+
// Create a post with distinct slugs per locale — these are mapped into the search doc via beforeSync
574+
const { id: postId } = await payload.create({
575+
collection: postsSlug,
576+
data: { title: 'Locale test post', _status: 'published', slug: 'post-slug-en' },
577+
locale: 'en',
578+
})
579+
580+
await payload.update({
581+
collection: postsSlug,
582+
id: postId,
583+
data: { slug: 'post-slug-es' },
584+
locale: 'es',
585+
})
586+
await payload.update({
587+
collection: postsSlug,
588+
id: postId,
589+
data: { slug: 'post-slug-de' },
590+
locale: 'de',
591+
})
592+
593+
// Create a page so both collections are reindexed together, exercising the multi-collection path
594+
await payload.create({
595+
collection: pagesSlug,
596+
data: { title: 'Locale test page', _status: 'published' },
597+
})
598+
599+
const endpointRes = await restClient.POST(`/search/reindex`, {
600+
body: JSON.stringify({ collections: [postsSlug, pagesSlug] }),
601+
headers: { Authorization: `JWT ${token}` },
602+
})
603+
604+
expect(endpointRes.status).toBe(200)
605+
606+
const { docs: searchDocs } = await payload.find({
607+
collection: 'search',
608+
depth: 0,
609+
where: {
610+
and: [{ 'doc.relationTo': { equals: postsSlug } }, { 'doc.value': { equals: postId } }],
611+
},
612+
})
613+
614+
expect(searchDocs).toHaveLength(1)
615+
616+
const searchDocId = searchDocs[0]!.id
617+
618+
const [enDoc, esDoc, deDoc] = await Promise.all([
619+
payload.findByID({ collection: 'search', id: searchDocId, locale: 'en' }),
620+
payload.findByID({ collection: 'search', id: searchDocId, locale: 'es' }),
621+
payload.findByID({ collection: 'search', id: searchDocId, locale: 'de' }),
622+
])
623+
624+
// With localization fallback: true, a missing locale update would silently fall back to 'en'
625+
// making these assertions fail — catching any regression to concurrent reindexing
626+
expect(enDoc.slug).toBe('post-slug-en')
627+
expect(esDoc.slug).toBe('post-slug-es')
628+
expect(deDoc.slug).toBe('post-slug-de')
629+
})
630+
541631
it('should exclude drafts from reindexing by default', async () => {
542632
await Promise.all([
543633
payload.create({

0 commit comments

Comments
 (0)