Hi XQ

Thanks for checking it out. SDFs chaining seems to work as I created my
pipeline while converting a pipeline that is built in the Java SDK. The
source of the Java pipeline can be found in
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java

So far, when I yield outputs, the second SDF gets stuck while it gets
executed if I return them (but the first SDF completes). If I change the
second SDF into a do function without adding the tracker, it is executed
fine. Not sure what happens in the first scenario.

Cheers,
Jaehyeon

On Sun, 5 May 2024 at 09:21, XQ Hu via user <[email protected]> wrote:

> I played with your example. Indeed, create_tracker in your ProcessFilesFn
> is never called, which is quite strange.
> I could not find any example that shows the chained SDFs, which makes me
> wonder whether the chained SDFs work.
>
> @Chamikara Jayalath <[email protected]> Any thoughts?
>
> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <[email protected]> wrote:
>
>> Hello,
>>
>> I am building a pipeline using two SDFs that are chained. The first
>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
>> file is added. The second one (ProcessFilesFn) processes a file
>> while splitting each line - the processing simply prints the file name and
>> line number.
>>
>> The process function of the first SDF gets stuck if I yield a new file
>> object. Specifically, although the second SDF is called as I can check the
>> initial restriction is created, the tracker is not created at all!
>>
>> On the other hand, if I return the file object list, the second SDF works
>> fine but the issue is the first SDF stops as soon as it returns the first
>> list of files.
>>
>> The source of the pipeline can be found in
>> - First SDF:
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>> - Second SDF:
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>> - Pipeline:
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>
>> Can you please inform me how to handle this issue?
>>
>> Cheers,
>> Jaehyeon
>>
>> class DirectoryWatchFn(beam.DoFn):
>>     POLL_TIMEOUT = 10
>>
>>     @beam.DoFn.unbounded_per_element()
>>     def process(
>>         self,
>>         element: str,
>>         tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>>             DirectoryWatchRestrictionProvider()
>>         ),
>>         watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>> WatermarkEstimatorParam(
>>             DirectoryWatchWatermarkEstimatorProvider()
>>         ),
>>     ) -> typing.Iterable[MyFile]:
>>         new_files = self._get_new_files_if_any(element, tracker)
>>         if self._process_new_files(tracker, watermark_estimater,
>> new_files):
>>             # return [new_file[0] for new_file in new_files] #<-- it
>> doesn't get stuck but the SDF finishes
>>             for new_file in new_files: #<--- it gets stuck if yielding
>> file objects
>>                 yield new_file[0]
>>         else:
>>             return
>>         tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>>
>>     def _get_new_files_if_any(
>>         self, element: str, tracker: DirectoryWatchRestrictionTracker
>>     ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>>         new_files = []
>>         for file in os.listdir(element):
>>             if (
>>                 os.path.isfile(os.path.join(element, file))
>>                 and file not in tracker.current_restriction().
>> already_processed
>>             ):
>>                 num_lines = sum(1 for _ in open(os.path.join(element,
>> file)))
>>                 new_file = MyFile(file, 0, num_lines)
>>                 print(new_file)
>>                 new_files.append(
>>                     (
>>                         new_file,
>>                         Timestamp.of(os.path.getmtime(os.path.join(
>> element, file))),
>>                     )
>>                 )
>>         return new_files
>>
>>     def _process_new_files(
>>         self,
>>         tracker: DirectoryWatchRestrictionTracker,
>>         watermark_estimater: ManualWatermarkEstimator,
>>         new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
>>     ):
>>         max_instance = watermark_estimater.current_watermark()
>>         for new_file in new_files:
>>             if tracker.try_claim(new_file[0].name) is False:
>>                 watermark_estimater.set_watermark(max_instance)
>>                 return False
>>             if max_instance < new_file[1]:
>>                 max_instance = new_file[1]
>>         watermark_estimater.set_watermark(max_instance)
>>         return max_instance < MAX_TIMESTAMP
>>
>

Reply via email to