Hi XQ Thanks for checking it out. SDFs chaining seems to work as I created my pipeline while converting a pipeline that is built in the Java SDK. The source of the Java pipeline can be found in https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
So far, when I yield outputs, the second SDF gets stuck while it gets executed if I return them (but the first SDF completes). If I change the second SDF into a do function without adding the tracker, it is executed fine. Not sure what happens in the first scenario. Cheers, Jaehyeon On Sun, 5 May 2024 at 09:21, XQ Hu via user <[email protected]> wrote: > I played with your example. Indeed, create_tracker in your ProcessFilesFn > is never called, which is quite strange. > I could not find any example that shows the chained SDFs, which makes me > wonder whether the chained SDFs work. > > @Chamikara Jayalath <[email protected]> Any thoughts? > > On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim <[email protected]> wrote: > >> Hello, >> >> I am building a pipeline using two SDFs that are chained. The first >> function (DirectoryWatchFn) checks a folder continuously and grabs if a new >> file is added. The second one (ProcessFilesFn) processes a file >> while splitting each line - the processing simply prints the file name and >> line number. >> >> The process function of the first SDF gets stuck if I yield a new file >> object. Specifically, although the second SDF is called as I can check the >> initial restriction is created, the tracker is not created at all! >> >> On the other hand, if I return the file object list, the second SDF works >> fine but the issue is the first SDF stops as soon as it returns the first >> list of files. >> >> The source of the pipeline can be found in >> - First SDF: >> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py >> - Second SDF: >> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py >> - Pipeline: >> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py >> >> Can you please inform me how to handle this issue? >> >> Cheers, >> Jaehyeon >> >> class DirectoryWatchFn(beam.DoFn): >> POLL_TIMEOUT = 10 >> >> @beam.DoFn.unbounded_per_element() >> def process( >> self, >> element: str, >> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam( >> DirectoryWatchRestrictionProvider() >> ), >> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn. >> WatermarkEstimatorParam( >> DirectoryWatchWatermarkEstimatorProvider() >> ), >> ) -> typing.Iterable[MyFile]: >> new_files = self._get_new_files_if_any(element, tracker) >> if self._process_new_files(tracker, watermark_estimater, >> new_files): >> # return [new_file[0] for new_file in new_files] #<-- it >> doesn't get stuck but the SDF finishes >> for new_file in new_files: #<--- it gets stuck if yielding >> file objects >> yield new_file[0] >> else: >> return >> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT)) >> >> def _get_new_files_if_any( >> self, element: str, tracker: DirectoryWatchRestrictionTracker >> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]: >> new_files = [] >> for file in os.listdir(element): >> if ( >> os.path.isfile(os.path.join(element, file)) >> and file not in tracker.current_restriction(). >> already_processed >> ): >> num_lines = sum(1 for _ in open(os.path.join(element, >> file))) >> new_file = MyFile(file, 0, num_lines) >> print(new_file) >> new_files.append( >> ( >> new_file, >> Timestamp.of(os.path.getmtime(os.path.join( >> element, file))), >> ) >> ) >> return new_files >> >> def _process_new_files( >> self, >> tracker: DirectoryWatchRestrictionTracker, >> watermark_estimater: ManualWatermarkEstimator, >> new_files: typing.List[typing.Tuple[MyFile, Timestamp]], >> ): >> max_instance = watermark_estimater.current_watermark() >> for new_file in new_files: >> if tracker.try_claim(new_file[0].name) is False: >> watermark_estimater.set_watermark(max_instance) >> return False >> if max_instance < new_file[1]: >> max_instance = new_file[1] >> watermark_estimater.set_watermark(max_instance) >> return max_instance < MAX_TIMESTAMP >> >
