Hi XQ Yes, it works with the FlinkRunner. Thank you so much!
Cheers, Jaehyeon [image: image.png] On Mon, 6 May 2024 at 02:49, XQ Hu via user <[email protected]> wrote: > Have you tried to use other runners? I think this might be caused by some > gaps in Python DirectRunner to support the streaming cases or SDFs, > > On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim <[email protected]> wrote: > >> Hi XQ >> >> Thanks for checking it out. SDFs chaining seems to work as I created my >> pipeline while converting a pipeline that is built in the Java SDK. The >> source of the Java pipeline can be found in >> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java >> >> So far, when I yield outputs, the second SDF gets stuck while it gets >> executed if I return them (but the first SDF completes). If I change the >> second SDF into a do function without adding the tracker, it is executed >> fine. Not sure what happens in the first scenario. >> >> Cheers, >> Jaehyeon >> >> On Sun, 5 May 2024 at 09:21, XQ Hu via user <[email protected]> wrote: >> >>> I played with your example. Indeed, create_tracker in >>> your ProcessFilesFn is never called, which is quite strange. >>> I could not find any example that shows the chained SDFs, which makes me >>> wonder whether the chained SDFs work. >>> >>> @Chamikara Jayalath <[email protected]> Any thoughts? >>> >>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <[email protected]> wrote: >>> >>>> Hello, >>>> >>>> I am building a pipeline using two SDFs that are chained. The first >>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new >>>> file is added. The second one (ProcessFilesFn) processes a file >>>> while splitting each line - the processing simply prints the file name and >>>> line number. >>>> >>>> The process function of the first SDF gets stuck if I yield a new file >>>> object. Specifically, although the second SDF is called as I can check the >>>> initial restriction is created, the tracker is not created at all! >>>> >>>> On the other hand, if I return the file object list, the second SDF >>>> works fine but the issue is the first SDF stops as soon as it returns the >>>> first list of files. >>>> >>>> The source of the pipeline can be found in >>>> - First SDF: >>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py >>>> - Second SDF: >>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py >>>> - Pipeline: >>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py >>>> >>>> Can you please inform me how to handle this issue? >>>> >>>> Cheers, >>>> Jaehyeon >>>> >>>> class DirectoryWatchFn(beam.DoFn): >>>> POLL_TIMEOUT = 10 >>>> >>>> @beam.DoFn.unbounded_per_element() >>>> def process( >>>> self, >>>> element: str, >>>> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam( >>>> DirectoryWatchRestrictionProvider() >>>> ), >>>> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn. >>>> WatermarkEstimatorParam( >>>> DirectoryWatchWatermarkEstimatorProvider() >>>> ), >>>> ) -> typing.Iterable[MyFile]: >>>> new_files = self._get_new_files_if_any(element, tracker) >>>> if self._process_new_files(tracker, watermark_estimater, >>>> new_files): >>>> # return [new_file[0] for new_file in new_files] #<-- it >>>> doesn't get stuck but the SDF finishes >>>> for new_file in new_files: #<--- it gets stuck if yielding >>>> file objects >>>> yield new_file[0] >>>> else: >>>> return >>>> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT)) >>>> >>>> def _get_new_files_if_any( >>>> self, element: str, tracker: DirectoryWatchRestrictionTracker >>>> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]: >>>> new_files = [] >>>> for file in os.listdir(element): >>>> if ( >>>> os.path.isfile(os.path.join(element, file)) >>>> and file not in tracker.current_restriction(). >>>> already_processed >>>> ): >>>> num_lines = sum(1 for _ in open(os.path.join(element, >>>> file))) >>>> new_file = MyFile(file, 0, num_lines) >>>> print(new_file) >>>> new_files.append( >>>> ( >>>> new_file, >>>> Timestamp.of(os.path.getmtime(os.path.join( >>>> element, file))), >>>> ) >>>> ) >>>> return new_files >>>> >>>> def _process_new_files( >>>> self, >>>> tracker: DirectoryWatchRestrictionTracker, >>>> watermark_estimater: ManualWatermarkEstimator, >>>> new_files: typing.List[typing.Tuple[MyFile, Timestamp]], >>>> ): >>>> max_instance = watermark_estimater.current_watermark() >>>> for new_file in new_files: >>>> if tracker.try_claim(new_file[0].name) is False: >>>> watermark_estimater.set_watermark(max_instance) >>>> return False >>>> if max_instance < new_file[1]: >>>> max_instance = new_file[1] >>>> watermark_estimater.set_watermark(max_instance) >>>> return max_instance < MAX_TIMESTAMP >>>> >>>
