Hello,
I have a simple pipeline that prints a number of SQL query strings. A total
of 9 query strings are generated by a DoFn called *GenerateQueryFn* and
those are pushed into a splittable DoFn named *ProcessQueryFn*. The
OffsetRange is used as a restriction, and the start and stop values are set
to 0 and 1 respectively because it just needs to print a query string as a
whole.
import argparse
import logging
import typing
import time
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam import RestrictionProvider
from apache_beam.io.restriction_trackers import OffsetRange,
OffsetRestrictionTracker
class GenerateQueryFn(beam.DoFn):
def process(self, element: typing.Any):
for query in self._generate_queries(None):
yield query
def _generate_queries(self, query_config):
if query_config is None:
logging.warning("To provide query config...")
return [
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 1 AND id < 1112",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 1112 AND id < 2223",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 2223 AND id < 3334",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 3334 AND id < 4445",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 4445 AND id < 5556",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 5556 AND id < 6667",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 6667 AND id < 7778",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 7778 AND id < 8889",
"SELECT * FROM (SELECT id, first_name, last_name, email FROM
staging.users) AS subquery WHERE id >= 8889 AND id <= 10000",
]
class ProcessQueryFn(beam.DoFn, RestrictionProvider):
def process(
self,
query: str,
tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(),
):
restriction = tracker.current_restriction()
for current_position in range(restriction.start, restriction.stop +
1):
if tracker.try_claim(current_position):
print(query)
time.sleep(2)
yield query
else:
return
def create_tracker(self, restriction: OffsetRange) ->
OffsetRestrictionTracker:
return OffsetRestrictionTracker(restriction)
def initial_restriction(self, element: str) -> OffsetRange:
return OffsetRange(start=0, stop=1)
def restriction_size(self, element: str, restriction: OffsetRange) ->
int:
return restriction.size()
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser(description="Beam pipeline arguments")
known_args, pipeline_args = parser.parse_known_args(argv)
# # We use the save_main_session option because one or more DoFn's in
this
# # workflow rely on global context (e.g., a module imported at module
level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session =
save_main_session
print(f"known args - {known_args}")
print(f"pipeline options - {pipeline_options.display_data()}")
with beam.Pipeline(options=pipeline_options) as p:
(
p
| beam.Create([0])
| beam.ParDo(GenerateQueryFn())
| beam.ParDo(ProcessQueryFn())
)
logging.getLogger().setLevel(logging.WARN)
logging.info("Building pipeline ...")
if __name__ == "__main__":
run()
When I execute the pipeline with 3 workers on the Python direct runner, it
looks like the elements are processed in parallel, which is expected - each
of 3 query strings are processed at once.
$ python pipeline.py --direct_num_workers=3
--direct_running_mode=multi_threading
known args - Namespace()
pipeline options - {'direct_num_workers': 3, 'direct_running_mode':
'multi_threading', 'save_main_session': True}
WARNING:root:To provide query config...
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 1 AND id < 1112
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 2223 AND id < 3334
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 1112 AND id < 2223
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 3334 AND id < 4445
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 5556 AND id < 6667
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 4445 AND id < 5556
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 6667 AND id < 7778
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 7778 AND id < 8889
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 8889 AND id <= 10000
When I execute it on the Flink runner where the parallelism is set to 3,
the elements seem to be processed by a single worker. I use Apache Beam
2.57.0 and Flink 1.18.1.
$ python pipeline.py --job_name=sql-query --runner FlinkRunner \
--flink_master=localhost:8081 --environment_type=LOOPBACK --parallelism=3
known args - Namespace()
pipeline options - {'runner': 'FlinkRunner', 'job_name': 'sql-query',
'save_main_session': True, 'environment_type': 'LOOPBACK', 'flink_master':
'localhost:8081', 'parallelism': 3}
WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
WARNING:root:Waiting for grpc channel to be ready at localhost:47651.
WARNING:root:To provide query config...
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 1 AND id < 1112
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 1112 AND id < 2223
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 2223 AND id < 3334
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 3334 AND id < 4445
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 4445 AND id < 5556
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 5556 AND id < 6667
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 6667 AND id < 7778
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 7778 AND id < 8889
SELECT * FROM (SELECT id, first_name, last_name, email FROM staging.users)
AS subquery WHERE id >= 8889 AND id <= 10000
I can check that on the Flink UI as well. It shows only a single worker
receives data while the others are idle.
[image: image.png]
Can you please inform me why the elements are not processed in parallel on
the Flink runner?
Cheers,
Jaehyeon