Hi XQ

Yes, it works with the FlinkRunner. Thank you so much!

Cheers,
Jaehyeon

[image: image.png]

On Mon, 6 May 2024 at 02:49, XQ Hu via user <[email protected]> wrote:

> Have you tried to use other runners? I think this might be caused by some
> gaps in Python DirectRunner to support the streaming cases or SDFs,
>
> On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim <[email protected]> wrote:
>
>> Hi XQ
>>
>> Thanks for checking it out. SDFs chaining seems to work as I created my
>> pipeline while converting a pipeline that is built in the Java SDK. The
>> source of the Java pipeline can be found in
>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>>
>> So far, when I yield outputs, the second SDF gets stuck while it gets
>> executed if I return them (but the first SDF completes). If I change the
>> second SDF into a do function without adding the tracker, it is executed
>> fine. Not sure what happens in the first scenario.
>>
>> Cheers,
>> Jaehyeon
>>
>> On Sun, 5 May 2024 at 09:21, XQ Hu via user <[email protected]> wrote:
>>
>>> I played with your example. Indeed, create_tracker in
>>> your ProcessFilesFn is never called, which is quite strange.
>>> I could not find any example that shows the chained SDFs, which makes me
>>> wonder whether the chained SDFs work.
>>>
>>> @Chamikara Jayalath <[email protected]> Any thoughts?
>>>
>>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <[email protected]> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am building a pipeline using two SDFs that are chained. The first
>>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
>>>> file is added. The second one (ProcessFilesFn) processes a file
>>>> while splitting each line - the processing simply prints the file name and
>>>> line number.
>>>>
>>>> The process function of the first SDF gets stuck if I yield a new file
>>>> object. Specifically, although the second SDF is called as I can check the
>>>> initial restriction is created, the tracker is not created at all!
>>>>
>>>> On the other hand, if I return the file object list, the second SDF
>>>> works fine but the issue is the first SDF stops as soon as it returns the
>>>> first list of files.
>>>>
>>>> The source of the pipeline can be found in
>>>> - First SDF:
>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>>>> - Second SDF:
>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>>>> - Pipeline:
>>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>>>
>>>> Can you please inform me how to handle this issue?
>>>>
>>>> Cheers,
>>>> Jaehyeon
>>>>
>>>> class DirectoryWatchFn(beam.DoFn):
>>>>     POLL_TIMEOUT = 10
>>>>
>>>>     @beam.DoFn.unbounded_per_element()
>>>>     def process(
>>>>         self,
>>>>         element: str,
>>>>         tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>>>>             DirectoryWatchRestrictionProvider()
>>>>         ),
>>>>         watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>>>> WatermarkEstimatorParam(
>>>>             DirectoryWatchWatermarkEstimatorProvider()
>>>>         ),
>>>>     ) -> typing.Iterable[MyFile]:
>>>>         new_files = self._get_new_files_if_any(element, tracker)
>>>>         if self._process_new_files(tracker, watermark_estimater,
>>>> new_files):
>>>>             # return [new_file[0] for new_file in new_files] #<-- it
>>>> doesn't get stuck but the SDF finishes
>>>>             for new_file in new_files: #<--- it gets stuck if yielding
>>>> file objects
>>>>                 yield new_file[0]
>>>>         else:
>>>>             return
>>>>         tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>>>>
>>>>     def _get_new_files_if_any(
>>>>         self, element: str, tracker: DirectoryWatchRestrictionTracker
>>>>     ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>>>>         new_files = []
>>>>         for file in os.listdir(element):
>>>>             if (
>>>>                 os.path.isfile(os.path.join(element, file))
>>>>                 and file not in tracker.current_restriction().
>>>> already_processed
>>>>             ):
>>>>                 num_lines = sum(1 for _ in open(os.path.join(element,
>>>> file)))
>>>>                 new_file = MyFile(file, 0, num_lines)
>>>>                 print(new_file)
>>>>                 new_files.append(
>>>>                     (
>>>>                         new_file,
>>>>                         Timestamp.of(os.path.getmtime(os.path.join(
>>>> element, file))),
>>>>                     )
>>>>                 )
>>>>         return new_files
>>>>
>>>>     def _process_new_files(
>>>>         self,
>>>>         tracker: DirectoryWatchRestrictionTracker,
>>>>         watermark_estimater: ManualWatermarkEstimator,
>>>>         new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
>>>>     ):
>>>>         max_instance = watermark_estimater.current_watermark()
>>>>         for new_file in new_files:
>>>>             if tracker.try_claim(new_file[0].name) is False:
>>>>                 watermark_estimater.set_watermark(max_instance)
>>>>                 return False
>>>>             if max_instance < new_file[1]:
>>>>                 max_instance = new_file[1]
>>>>         watermark_estimater.set_watermark(max_instance)
>>>>         return max_instance < MAX_TIMESTAMP
>>>>
>>>

Reply via email to