The Dynamic Work Rebalancing feature of the Dataflow service allows the service to dynamically repartition work based on runtime conditions. These conditions might include the following:
- Imbalances in work assignments
- Workers taking longer than expected to finish
- Workers finishing faster than expected
The Dataflow service automatically detects these conditions and can dynamically assign work to unused or underused workers to decrease the overall processing time of your job.
Limitations
Dynamic work rebalancing only happens when the Dataflow service is
processing some input data in parallel: when reading data from an external input
source, when working with a materialized intermediate PCollection
, or when
working with the result of an aggregation like GroupByKey
. If a large number
of steps in your job are
fused, your job has fewer
intermediate PCollection
s, and dynamic work rebalancing is
limited to the number of elements in the source materialized PCollection
. If
you want to ensure that dynamic work rebalancing can be applied to a particular
PCollection
in your pipeline, you can
prevent fusion in a few
different ways to ensure dynamic parallelism.
Dynamic work rebalancing cannot reparallelize data finer than a single record. If your data contains individual records that cause large delays in processing time, they might still delay your job. Dataflow can't subdivide and redistribute an individual "hot" record to multiple workers.
Java
If you set a fixed number of shards for the final output of your pipeline (for
example, by writing data using TextIO.Write.withNumShards
),
Dataflow limits parallelization based on the number of
shards that you choose.
Python
If you set a fixed number of shards for the final output of your pipeline (for
example, by writing data using beam.io.WriteToText(..., num_shards=...)
),
Dataflow limits parallelization based on the number of
shards that you choose.
Go
If you set a fixed number of shards for the final output of your pipeline, Dataflow limits parallelization based on the number of shards that you choose.
Working with Custom Data Sources
Java
If your pipeline uses a custom data source that you provide, you must
implement the method splitAtFraction
to allow your source to work with the
dynamic work rebalancing feature.
If you implement splitAtFraction
incorrectly, records from your source might
appear to get duplicated or dropped. See the
API reference information on RangeTracker for help and tips on
implementing splitAtFraction
.
Python
If your pipeline uses a custom data source that you provide, your
RangeTracker
must implement try_claim
, try_split
,
position_at_fraction
, and fraction_consumed
to allow your source to work
with the dynamic work rebalancing feature.
See the API reference information on RangeTracker for more information.
Go
If your pipeline uses a custom data source that you provide, you must
implement a valid RTracker
to allow your source to work with the dynamic
work rebalancing feature.
For more information, see the RTracker API reference information.
Dynamic work rebalancing uses the return value of the getProgress()
method of your custom source to activate. The default implementation for getProgress()
returns
null
. To ensure autoscaling activates, make sure your custom source overrides
getProgress()
to return an appropriate value.