Skip to content

Commit e57e5ee

Browse files
committed
[FLINK-30604][runtime] Remove the limitation that parallelism decided by adaptive batch scheduler is always a power of two
This closes #21626
1 parent 881627d commit e57e5ee

9 files changed

Lines changed: 35 additions & 105 deletions

File tree

docs/content.zh/docs/deployment/elastic_scaling.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ Adaptive Batch Scheduler 是一种可以自动推导每个算子并行度的批
167167
- 由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL-EXCHANGES-BLOCKING`(默认值) 。
168168

169169
除此之外,使用 Adaptive Batch Scheduler 时,以下相关配置也可以调整:
170-
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): 允许自动设置的并行度最小值。需要配置为 2 的幂,否则也会被自动调整为最接近且大于其的 2 的幂。
171-
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): 允许自动设置的并行度最大值。需要配置为 2 的幂,否则也会被自动调整为最接近且小于其的 2 的幂。
172-
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): 期望每个任务平均处理的数据量大小。由于顶点的并行度会被调整为 2^N,因此实际每个任务平均处理的数据量大小将是该值的 0.75~1.5 倍。 另外需要注意的是,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
170+
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): 允许自动设置的并行度最小值。
171+
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): 允许自动设置的并行度最大值。
172+
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): 期望每个任务平均处理的数据量大小。请注意,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
173173
- [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): source 算子的默认并行度
174174

175175
#### 配置算子的并行度为 `-1`
@@ -187,7 +187,6 @@ Adaptive Batch Scheduler 只会为用户未指定并行度的算子(并行度
187187
### 局限性
188188
- **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
189189
- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL-EXCHANGES-BLOCKING 的作业。
190-
- **推导出的并行度是 2 的幂**: 为了使子分区可以均匀分配给下游任务,[`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) 应该被配置为 2^N, 推导出的并行度会是 2^M, 且满足 M <= N。
191190
- **不支持 FileInputFormat 类型的 source**: 不支持 FileInputFormat 类型的 source, 包括 `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)``StreamExecutionEnvironment#createInput(FileInputFormat, ...)`。 当使用 Adaptive Batch Scheduler 时,用户应该使用新版的 Source API ([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) 或 [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) 来读取文件.
192191
- **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)
193192

docs/content/docs/deployment/elastic_scaling.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,9 @@ To use Adaptive Batch Scheduler, you need to:
169169
- Leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value) due to ["ALL-EXCHANGES-BLOCKING jobs only"](#limitations-2).
170170

171171
In addition, there are several related configuration options that may need adjustment when using Adaptive Batch Scheduler:
172-
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded up to a power of 2 automatically.
173-
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will be rounded down to a power of 2 automatically.
174-
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
172+
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively.
173+
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively.
174+
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
175175
- [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.
176176

177177
#### Set the parallelism of operators to `-1`
@@ -190,7 +190,6 @@ Adaptive Batch Scheduler will only decide parallelism for operators whose parall
190190

191191
- **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
192192
- **ALL-EXCHANGES-BLOCKING jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL-EXCHANGES-BLOCKING`.
193-
- **The decided parallelism will be a power of 2**: In order to ensure downstream tasks to consume the same count of subpartitions, the configuration option [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) should be set to be a power of 2 (2^N), and the decided parallelism will also be a power of 2 (2^M and M <= N).
194193
- **FileInputFormat sources are not supported**: FileInputFormat sources are not supported, including `StreamExecutionEnvironment#readFile(...)` `StreamExecutionEnvironment#readTextFile(...)` and `StreamExecutionEnvironment#createInput(FileInputFormat, ...)`. Users should use the new sources([FileSystem DataStream Connector]({{< ref "docs/connectors/datastream/filesystem.md" >}}) or [FileSystem SQL Connector]({{< ref "docs/connectors/table/filesystem.md" >}})) to read files when using the Adaptive Batch Scheduler.
195194
- **Inconsistent broadcast results metrics on WebUI**: In Adaptive Batch Scheduler, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details.
196195

docs/layouts/shortcodes/generated/all_jobmanager_section.html

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<td><h5>jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task</h5></td>
1313
<td style="word-wrap: break-word;">1 gb</td>
1414
<td>MemorySize</td>
15-
<td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
15+
<td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
1616
</tr>
1717
<tr>
1818
<td><h5>jobmanager.adaptive-batch-scheduler.default-source-parallelism</h5></td>
@@ -24,13 +24,13 @@
2424
<td><h5>jobmanager.adaptive-batch-scheduler.max-parallelism</h5></td>
2525
<td style="word-wrap: break-word;">128</td>
2626
<td>Integer</td>
27-
<td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded down to a power of 2 automatically.</td>
27+
<td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
2828
</tr>
2929
<tr>
3030
<td><h5>jobmanager.adaptive-batch-scheduler.min-parallelism</h5></td>
3131
<td style="word-wrap: break-word;">1</td>
3232
<td>Integer</td>
33-
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded up to a power of 2 automatically.</td>
33+
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
3434
</tr>
3535
<tr>
3636
<td><h5>jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration</h5></td>

docs/layouts/shortcodes/generated/expert_scheduling_section.html

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<td><h5>jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task</h5></td>
3131
<td style="word-wrap: break-word;">1 gb</td>
3232
<td>MemorySize</td>
33-
<td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
33+
<td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
3434
</tr>
3535
<tr>
3636
<td><h5>jobmanager.adaptive-batch-scheduler.default-source-parallelism</h5></td>
@@ -42,13 +42,13 @@
4242
<td><h5>jobmanager.adaptive-batch-scheduler.max-parallelism</h5></td>
4343
<td style="word-wrap: break-word;">128</td>
4444
<td>Integer</td>
45-
<td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded down to a power of 2 automatically.</td>
45+
<td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
4646
</tr>
4747
<tr>
4848
<td><h5>jobmanager.adaptive-batch-scheduler.min-parallelism</h5></td>
4949
<td style="word-wrap: break-word;">1</td>
5050
<td>Integer</td>
51-
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded up to a power of 2 automatically.</td>
51+
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
5252
</tr>
5353
<tr>
5454
<td><h5>jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration</h5></td>

docs/layouts/shortcodes/generated/job_manager_configuration.html

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<td><h5>jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task</h5></td>
1313
<td style="word-wrap: break-word;">1 gb</td>
1414
<td>MemorySize</td>
15-
<td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
15+
<td>The average size of data volume to expect each task instance to process if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Note that when data skew occurs or the decided parallelism reaches the <code class="highlighter-rouge">jobmanager.adaptive-batch-scheduler.max-parallelism</code> (due to too much data), the data actually processed by some tasks may far exceed this value.</td>
1616
</tr>
1717
<tr>
1818
<td><h5>jobmanager.adaptive-batch-scheduler.default-source-parallelism</h5></td>
@@ -24,13 +24,13 @@
2424
<td><h5>jobmanager.adaptive-batch-scheduler.max-parallelism</h5></td>
2525
<td style="word-wrap: break-word;">128</td>
2626
<td>Integer</td>
27-
<td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded down to a power of 2 automatically.</td>
27+
<td>The upper bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
2828
</tr>
2929
<tr>
3030
<td><h5>jobmanager.adaptive-batch-scheduler.min-parallelism</h5></td>
3131
<td style="word-wrap: break-word;">1</td>
3232
<td>Integer</td>
33-
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code>. Currently, this option should be configured as a power of 2, otherwise it will also be rounded up to a power of 2 automatically.</td>
33+
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
3434
</tr>
3535
<tr>
3636
<td><h5>jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration</h5></td>

0 commit comments

Comments
 (0)