Skip to content

Commit d8a2453

Browse files
authored
NIFI-15869 Add Unmatched Field Behavior property to PutBigQuery (#11172)
1 parent fa0a58d commit d8a2453

3 files changed

Lines changed: 263 additions & 6 deletions

File tree

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,14 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
172172
.defaultValue("false")
173173
.build();
174174

175+
public static final PropertyDescriptor UNMATCHED_FIELD_BEHAVIOR = new PropertyDescriptor.Builder()
176+
.name("Unmatched Field Behavior")
177+
.description("Specifies how to handle fields in the incoming record that do not exist in the destination BigQuery table schema.")
178+
.required(true)
179+
.allowableValues(UnmatchedFieldBehavior.class)
180+
.defaultValue(UnmatchedFieldBehavior.IGNORE)
181+
.build();
182+
175183
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
176184
GCP_CREDENTIALS_PROVIDER_SERVICE,
177185
PROJECT_ID,
@@ -183,6 +191,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
183191
APPEND_RECORD_COUNT,
184192
RETRY_COUNT,
185193
SKIP_INVALID_ROWS,
194+
UNMATCHED_FIELD_BEHAVIOR,
186195
PROXY_CONFIGURATION_SERVICE
187196
);
188197

@@ -235,13 +244,14 @@ public void onTrigger(ProcessContext context, ProcessSession session) {
235244
}
236245

237246
final boolean skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean();
247+
final UnmatchedFieldBehavior unmatchedFieldBehavior = context.getProperty(UNMATCHED_FIELD_BEHAVIOR).asAllowableValue(UnmatchedFieldBehavior.class);
238248
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
239249

240250
int recordNumWritten;
241251
try {
242252
try (InputStream in = session.read(flowFile);
243253
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
244-
recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows, tableSchema);
254+
recordNumWritten = writeRecordsToStream(reader, protoDescriptor, skipInvalidRows, tableSchema, unmatchedFieldBehavior, tableName);
245255
}
246256
flowFile = session.putAttribute(flowFile, JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
247257
} catch (Exception e) {
@@ -261,13 +271,14 @@ public void migrateProperties(PropertyConfiguration config) {
261271
config.renameProperty("bq.skip.invalid.rows", SKIP_INVALID_ROWS.getName());
262272
}
263273

264-
private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema tableSchema) throws Exception {
274+
private int writeRecordsToStream(final RecordReader reader, final Descriptors.Descriptor descriptor, final boolean skipInvalidRows, final TableSchema tableSchema,
275+
final UnmatchedFieldBehavior unmatchedFieldBehavior, final TableName tableName) throws Exception {
265276
Record currentRecord;
266277
int offset = 0;
267278
int recordNum = 0;
268279
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
269280
while ((currentRecord = reader.nextRecord()) != null) {
270-
DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows, tableSchema);
281+
final DynamicMessage message = recordToProtoMessage(currentRecord, descriptor, skipInvalidRows, tableSchema, unmatchedFieldBehavior, tableName);
271282

272283
if (message == null) {
273284
continue;
@@ -289,12 +300,30 @@ private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor des
289300
return recordNum;
290301
}
291302

292-
private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor, boolean skipInvalidRows, TableSchema tableSchema) {
293-
Map<String, Object> valueMap = convertMapRecord(record.toMap(), tableSchema.getFieldsList());
303+
private DynamicMessage recordToProtoMessage(final Record record, final Descriptors.Descriptor descriptor, final boolean skipInvalidRows, final TableSchema tableSchema,
304+
final UnmatchedFieldBehavior unmatchedFieldBehavior, final TableName tableName) {
305+
final Map<String, Object> rawMap = record.toMap();
306+
307+
if (unmatchedFieldBehavior != UnmatchedFieldBehavior.IGNORE) {
308+
final List<String> unmatchedFields = new ArrayList<>();
309+
collectUnmatchedFields(rawMap, tableSchema.getFieldsList(), null, unmatchedFields);
310+
311+
if (!unmatchedFields.isEmpty()) {
312+
if (unmatchedFieldBehavior == UnmatchedFieldBehavior.WARN) {
313+
getLogger().warn("Record contains {} field(s) not present in BigQuery table schema for table {}: {}",
314+
unmatchedFields.size(), tableName, unmatchedFields);
315+
} else {
316+
throw new ProcessException("Record contains fields not present in BigQuery table schema for table %s: %s"
317+
.formatted(tableName, unmatchedFields));
318+
}
319+
}
320+
}
321+
322+
final Map<String, Object> valueMap = convertMapRecord(rawMap, tableSchema.getFieldsList());
294323
DynamicMessage message = null;
295324
try {
296325
message = ProtoUtils.createMessage(descriptor, valueMap, tableSchema);
297-
} catch (RuntimeException e) {
326+
} catch (final RuntimeException e) {
298327
getLogger().error("Cannot convert record to message", e);
299328
if (!skipInvalidRows) {
300329
throw e;
@@ -304,6 +333,33 @@ private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descripto
304333
return message;
305334
}
306335

336+
private static void collectUnmatchedFields(final Map<String, Object> recordMap, final List<TableFieldSchema> tableFields,
337+
final String parentPath, final List<String> unmatchedFields) {
338+
for (final Map.Entry<String, Object> entry : recordMap.entrySet()) {
339+
final String key = entry.getKey();
340+
final Object value = entry.getValue();
341+
final String fieldPath = parentPath == null ? key : parentPath + "." + key;
342+
final TableFieldSchema fieldSchema = findFieldSchema(tableFields, key);
343+
344+
if (fieldSchema == null) {
345+
unmatchedFields.add(fieldPath);
346+
continue;
347+
}
348+
349+
if (fieldSchema.getType() == TableFieldSchema.Type.STRUCT) {
350+
if (value instanceof MapRecord mapRecord) {
351+
collectUnmatchedFields(mapRecord.toMap(), fieldSchema.getFieldsList(), fieldPath, unmatchedFields);
352+
} else if (value instanceof Object[] arrayValue) {
353+
for (final Object item : arrayValue) {
354+
if (item instanceof MapRecord mapRecordItem) {
355+
collectUnmatchedFields(mapRecordItem.toMap(), fieldSchema.getFieldsList(), fieldPath, unmatchedFields);
356+
}
357+
}
358+
}
359+
}
360+
}
361+
}
362+
307363
private void append(AppendContext appendContext) throws Exception {
308364
if (error.get() != null) {
309365
throw error.get();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.nifi.processors.gcp.bigquery;
19+
20+
import org.apache.nifi.components.DescribedValue;
21+
22+
/**
23+
* Strategy applied when an incoming record contains a field that does not exist in the
24+
* target BigQuery table schema. The BigQuery Storage Write API does not surface this condition
25+
* because NiFi drops unknown fields client-side before encoding to Protobuf; this enum allows
26+
* operators to opt in to logging or failure semantics.
27+
*/
28+
public enum UnmatchedFieldBehavior implements DescribedValue {
29+
30+
IGNORE("Ignore Unmatched Fields", "Ignore Unmatched Fields",
31+
"Any record field that does not map to a BigQuery table column is silently ignored."),
32+
WARN("Warn on Unmatched Fields", "Warn on Unmatched Fields",
33+
"Any record field that does not map to a BigQuery table column is ignored, but a warning is logged per affected record."),
34+
FAIL("Fail on Unmatched Fields", "Fail on Unmatched Fields", """
35+
If a record contains a field that does not map to a BigQuery table column, the FlowFile is routed to the failure relationship. \
36+
This decision is independent of Skip Invalid Rows, which only governs row-level serialization failures.""");
37+
38+
private final String value;
39+
private final String displayName;
40+
private final String description;
41+
42+
UnmatchedFieldBehavior(final String value, final String displayName, final String description) {
43+
this.value = value;
44+
this.displayName = displayName;
45+
this.description = description;
46+
}
47+
48+
@Override
49+
public String getValue() {
50+
return value;
51+
}
52+
53+
@Override
54+
public String getDisplayName() {
55+
return displayName;
56+
}
57+
58+
@Override
59+
public String getDescription() {
60+
return description;
61+
}
62+
}

nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.apache.nifi.proxy.ProxyConfiguration;
4949
import org.apache.nifi.reporting.InitializationException;
5050
import org.apache.nifi.schema.access.SchemaAccessUtils;
51+
import org.apache.nifi.util.LogMessage;
52+
import org.apache.nifi.util.MockComponentLog;
5153
import org.apache.nifi.util.PropertyMigrationResult;
5254
import org.apache.nifi.util.TestRunner;
5355
import org.apache.nifi.util.TestRunners;
@@ -80,6 +82,7 @@
8082
import static org.mockito.ArgumentMatchers.anyLong;
8183
import static org.mockito.ArgumentMatchers.isA;
8284
import static org.mockito.Mockito.mock;
85+
import static org.mockito.Mockito.never;
8386
import static org.mockito.Mockito.reset;
8487
import static org.mockito.Mockito.times;
8588
import static org.mockito.Mockito.verify;
@@ -367,6 +370,112 @@ void testUnknownColumnSkipped() {
367370
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
368371
}
369372

373+
@Test
374+
void testUnmatchedFieldWarnLogsAndWrites() {
375+
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
376+
final TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
377+
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
378+
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
379+
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
380+
381+
runner.setProperty(PutBigQuery.UNMATCHED_FIELD_BEHAVIOR, UnmatchedFieldBehavior.WARN);
382+
383+
final String unknownProperty = "myUnknownProperty";
384+
runner.enqueue(CSV_HEADER + ",unknownField\nmyId,myValue," + unknownProperty);
385+
runner.run();
386+
387+
verify(streamWriter).append(protoRowsCaptor.capture(), offsetCaptor.capture());
388+
final ProtoRows rows = protoRowsCaptor.getValue();
389+
assertEquals(1, rows.getSerializedRowsCount());
390+
assertFalse(rows.getSerializedRowsList().getFirst().toString().contains(unknownProperty));
391+
392+
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
393+
assertUnmatchedFieldWarningLogged(runner);
394+
}
395+
396+
@Test
397+
void testUnmatchedFieldFailRoutesToFailure() {
398+
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
399+
final TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
400+
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
401+
402+
runner.setProperty(PutBigQuery.UNMATCHED_FIELD_BEHAVIOR, UnmatchedFieldBehavior.FAIL);
403+
runner.setProperty(PutBigQuery.SKIP_INVALID_ROWS, "false");
404+
405+
runner.enqueue(CSV_HEADER + ",unknownField\nmyId,myValue,extraField");
406+
runner.run();
407+
408+
verify(streamWriter, never()).append(any(ProtoRows.class), anyLong());
409+
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_FAILURE);
410+
}
411+
412+
@Test
413+
void testUnmatchedFieldFailIgnoresSkipInvalidRows() {
414+
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
415+
final TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
416+
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
417+
418+
runner.setProperty(PutBigQuery.UNMATCHED_FIELD_BEHAVIOR, UnmatchedFieldBehavior.FAIL);
419+
runner.setProperty(PutBigQuery.SKIP_INVALID_ROWS, "true");
420+
421+
runner.enqueue(CSV_HEADER + ",unknownField\nidOne,valueOne,extraField");
422+
runner.run();
423+
424+
verify(streamWriter, never()).append(any(ProtoRows.class), anyLong());
425+
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_FAILURE);
426+
}
427+
428+
@Test
429+
void testUnmatchedFieldInNestedStructLogsDottedPath() throws InitializationException {
430+
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
431+
432+
final TableFieldSchema nestedKnown = mock(TableFieldSchema.class);
433+
when(nestedKnown.getName()).thenReturn("known");
434+
when(nestedKnown.getType()).thenReturn(TableFieldSchema.Type.STRING);
435+
when(nestedKnown.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);
436+
437+
final TableFieldSchema parent = mock(TableFieldSchema.class);
438+
when(parent.getName()).thenReturn("parent");
439+
when(parent.getType()).thenReturn(TableFieldSchema.Type.STRUCT);
440+
when(parent.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);
441+
when(parent.getFieldsList()).thenReturn(List.of(nestedKnown));
442+
443+
final TableSchema schemaWithStruct = mock(TableSchema.class);
444+
when(schemaWithStruct.getFieldsList()).thenReturn(List.of(parent));
445+
446+
when(writeStream.getTableSchema()).thenReturn(schemaWithStruct);
447+
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
448+
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
449+
450+
decorateWithNestedStructJsonReader(runner);
451+
452+
runner.setProperty(PutBigQuery.UNMATCHED_FIELD_BEHAVIOR, UnmatchedFieldBehavior.WARN);
453+
454+
runner.enqueue("""
455+
{"parent":{"known":"ok","unexpected":"oops"}}
456+
""");
457+
runner.run();
458+
459+
runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
460+
assertUnmatchedFieldPathLogged(runner, "parent.unexpected");
461+
}
462+
463+
private static void assertUnmatchedFieldWarningLogged(final TestRunner runner) {
464+
final MockComponentLog logger = runner.getLogger();
465+
final boolean found = logger.getWarnMessages().stream()
466+
.map(LogMessage::getMsg)
467+
.anyMatch(msg -> msg != null && msg.contains("not present in BigQuery table schema"));
468+
assertTrue(found, "Expected a warning about unmatched fields, got: " + logger.getWarnMessages());
469+
}
470+
471+
private static void assertUnmatchedFieldPathLogged(final TestRunner runner, final String expectedPath) {
472+
final MockComponentLog logger = runner.getLogger();
473+
final boolean found = logger.getWarnMessages().stream()
474+
.map(LogMessage::getMsg)
475+
.anyMatch(msg -> msg != null && msg.contains(expectedPath));
476+
assertTrue(found, "Expected warning to reference path '" + expectedPath + "', got: " + logger.getWarnMessages());
477+
}
478+
370479
@Test
371480
void testSchema() throws Exception {
372481
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
@@ -561,6 +670,36 @@ private void decorateWithJsonRecordReaderWithSchema(TestRunner runner) throws In
561670
runner.enableControllerService(jsonReader);
562671
}
563672

673+
private void decorateWithNestedStructJsonReader(TestRunner runner) throws InitializationException {
674+
String recordReaderSchema = """
675+
{
676+
"name": "nested",
677+
"namespace": "nifi.examples",
678+
"type": "record",
679+
"fields": [
680+
{
681+
"name": "parent",
682+
"type": {
683+
"type": "record",
684+
"name": "ParentRecord",
685+
"fields": [
686+
{"name": "known", "type": ["null", "string"], "default": null},
687+
{"name": "unexpected", "type": ["null", "string"], "default": null}
688+
]
689+
}
690+
}
691+
]
692+
}""";
693+
694+
JsonTreeReader jsonReader = new JsonTreeReader();
695+
runner.addControllerService("nestedJsonReader", jsonReader);
696+
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
697+
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, recordReaderSchema);
698+
runner.enableControllerService(jsonReader);
699+
700+
runner.setProperty(PutBigQuery.RECORD_READER, "nestedJsonReader");
701+
}
702+
564703
private TableSchema mockTableSchema(String name1, TableFieldSchema.Type type1, String name2, TableFieldSchema.Type type2) {
565704
TableSchema myTableSchema = mock(TableSchema.class);
566705

0 commit comments

Comments
 (0)