Skip to content

Commit fb482fe

Browse files
committed
[FLINK-30213] Fix logic for determining downstream subtasks for partitioner replacement
1 parent e57e5ee commit fb482fe

3 files changed

Lines changed: 23 additions & 27 deletions

File tree

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,7 +1610,8 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
16101610

16111611
int index = 0;
16121612
for (NonChainedOutput streamOutput : outputsInOrder) {
1613-
replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(environment, streamOutput);
1613+
replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
1614+
environment, streamOutput, index);
16141615
recordWriters.add(
16151616
createRecordWriter(
16161617
streamOutput,
@@ -1623,10 +1624,13 @@ List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
16231624
}
16241625

16251626
private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
1626-
Environment environment, NonChainedOutput streamOutput) {
1627+
Environment environment, NonChainedOutput streamOutput, int outputIndex) {
16271628
if (streamOutput.getPartitioner() instanceof ForwardPartitioner
1628-
&& streamOutput.getConsumerParallelism()
1629+
&& environment.getWriter(outputIndex).getNumberOfSubpartitions()
16291630
!= environment.getTaskInfo().getNumberOfParallelSubtasks()) {
1631+
LOG.debug(
1632+
"Replacing forward partitioner with rebalance for {}",
1633+
environment.getTaskInfo().getTaskNameWithSubtasks());
16301634
streamOutput.setPartitioner(new RebalancePartitioner<>());
16311635
}
16321636
}

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public class StreamMockEnvironment implements Environment {
9595

9696
private final List<IndexedInputGate> inputs;
9797

98-
private final List<ResultPartitionWriter> outputs;
98+
private List<ResultPartitionWriter> outputs;
9999

100100
private final JobID jobID;
101101

@@ -236,6 +236,10 @@ public void addOutput(ResultPartitionWriter resultPartitionWriter) {
236236
}
237237
}
238238

239+
public void setOutputs(List<ResultPartitionWriter> outputs) {
240+
this.outputs = outputs;
241+
}
242+
239243
public void setExternalExceptionHandler(Consumer<Throwable> externalExceptionHandler) {
240244
this.externalExceptionHandler = externalExceptionHandler;
241245
}

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
2525
import org.apache.flink.api.common.state.CheckpointListener;
2626
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
27-
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
2827
import org.apache.flink.api.common.typeutils.TypeSerializer;
2928
import org.apache.flink.configuration.Configuration;
3029
import org.apache.flink.configuration.ReadableConfig;
@@ -58,11 +57,10 @@
5857
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
5958
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
6059
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
61-
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
60+
import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
6261
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
6362
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
6463
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
65-
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
6664
import org.apache.flink.runtime.jobgraph.OperatorID;
6765
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
6866
import org.apache.flink.runtime.metrics.TimerGauge;
@@ -117,7 +115,6 @@
117115
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
118116
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
119117
import org.apache.flink.streaming.api.functions.source.SourceFunction;
120-
import org.apache.flink.streaming.api.graph.NonChainedOutput;
121118
import org.apache.flink.streaming.api.graph.StreamConfig;
122119
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
123120
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -143,7 +140,6 @@
143140
import org.apache.flink.util.ExceptionUtils;
144141
import org.apache.flink.util.FatalExitExceptionHandler;
145142
import org.apache.flink.util.FlinkRuntimeException;
146-
import org.apache.flink.util.OutputTag;
147143
import org.apache.flink.util.TestLogger;
148144
import org.apache.flink.util.clock.SystemClock;
149145
import org.apache.flink.util.concurrent.FutureUtils;
@@ -1835,24 +1831,16 @@ public void testForwardPartitionerIsConvertedToRebalanceOnParallelismChanges()
18351831
.getChannelSelector()
18361832
instanceof ForwardPartitioner);
18371833

1838-
// Change consumer parallelism
1839-
harness.streamTask.configuration.setVertexNonChainedOutputs(
1840-
Arrays.asList(
1841-
new NonChainedOutput(
1842-
false,
1843-
0,
1844-
// Set a different consumer parallelism to force trigger
1845-
// replacing the ForwardPartitioner
1846-
42,
1847-
100,
1848-
1000,
1849-
false,
1850-
new IntermediateDataSetID(),
1851-
new OutputTag<>("output", IntegerTypeInfo.INT_TYPE_INFO),
1852-
// Use forward partitioner
1853-
new ForwardPartitioner<>(),
1854-
ResultPartitionType.PIPELINED)));
1855-
harness.streamTask.configuration.serializeAllConfigs();
1834+
// Simulate changed downstream task parallelism (1->2)
1835+
List<ResultPartitionWriter> newOutputs = new ArrayList<>();
1836+
newOutputs.add(
1837+
new MockResultPartitionWriter() {
1838+
@Override
1839+
public int getNumberOfSubpartitions() {
1840+
return 2;
1841+
}
1842+
});
1843+
harness.streamMockEnvironment.setOutputs(newOutputs);
18561844

18571845
// Re-create outputs
18581846
recordWriterDelegate =

0 commit comments

Comments
 (0)