[ https://issues.apache.org/jira/browse/BEAM-14315?focusedWorklogId=771389&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-771389 ]
ASF GitHub Bot logged work on BEAM-14315: ----------------------------------------- Author: ASF GitHub Bot Created on: 17/May/22 14:34 Start Date: 17/May/22 14:34 Worklog Time Spent: 10m Work Description: Abacn commented on code in PR #17604: URL: https://github.com/apache/beam/pull/17604#discussion_r874898781 ########## sdks/python/apache_beam/io/filebasedsource.py: ########## @@ -449,3 +458,92 @@ def expand(self, pvalue): | 'ReadRange' >> ParDo( _ReadRange( self._source_from_file, with_filename=self._with_filename))) + + +class ReadAllFilesContinuously(PTransform): + """A file source that reads files continuously. + + Pipeline authors should not use this directly. This is to be used by Read + PTransform authors who wishes to implement file-based Read transforms that + read files continuously. + + Unlike ``ReadAllFiles``, patterns are provided as constructor parameter at + the pipeline definition time. + """ + ARGS_FOR_MATCH = { + 'interval', + 'has_deduplication', + 'start_timestamp', + 'stop_timestamp', + 'match_updated_files', + 'apply_windowing' + } + + def __init__(self, + file_pattern, # type: str + splittable, # type: bool + compression_type, + desired_bundle_size, # type: int + min_bundle_size, # type: int + source_from_file, # type: Callable[[str], iobase.BoundedSource] + with_filename=False, # type: bool + **kwargs # parameters for MatchContinuously + ): + """ + Args: + file_pattern: a file pattern to match + splittable: If False, files won't be split into sub-ranges. If True, + files may or may not be split into data ranges. + compression_type: A ``CompressionType`` object that specifies the + compression type of the files that will be processed. If + ``CompressionType.AUTO``, system will try to automatically + determine the compression type based on the extension of + files. + desired_bundle_size: the desired size of data ranges that should be + generated when splitting a file into data ranges. + min_bundle_size: minimum size of data ranges that should be generated when + splitting a file into data ranges. + source_from_file: a function that produces a ``BoundedSource`` given a + file name. System will use this function to generate + ``BoundedSource`` objects for file paths. Note that file + paths passed to this will be for individual files, not + for file patterns even if the ``PCollection`` of files + processed by the transform consist of file patterns. + with_filename: If True, returns a Key Value with the key being the file + name and the value being the actual data. If False, it only returns + the data. + + refer to ``MatchContinuously`` for additional args including 'interval', + 'has_deduplication', 'start_timestamp', 'stop_timestamp', + 'match_updated_files'. + """ + self._file_pattern = file_pattern + self._splittable = splittable + self._compression_type = compression_type + self._desired_bundle_size = desired_bundle_size + self._min_bundle_size = min_bundle_size + self._source_from_file = source_from_file + self._with_filename = with_filename + self._kwargs_for_match = { + k: v + for (k, v) in kwargs.items() if k in self.ARGS_FOR_MATCH + } + + def expand(self, pbegin): + # imported locally to avoid circular import + from apache_beam.io.fileio import MatchContinuously + + return ( + pbegin + | MatchContinuously(self._file_pattern, **self._kwargs_for_match) + | 'ExpandIntoRanges' >> ParDo( + _ExpandIntoRanges( + self._splittable, + self._compression_type, + self._desired_bundle_size, + self._min_bundle_size, + do_match=False)) + # | 'Reshard' >> Reshuffle() # not Reshuffle because needs timely read. Review Comment: The side effect of reshuffle makes it pretty hard to ensure _ReadRange is executed instantly after matchAll. group by key will hold elements until they get fired. I tried adding windows and triggers but it still did not work well, and makes the pipeline complicated. The question is, what is the consequence if we do not reshuffle here? Does it cause wrong data (emit more than once) or data losing if one read fails, or just makes the worker do unnecessary retry? If it is just the later I think it is fine here. Issue Time Tracking ------------------- Worklog Id: (was: 771389) Time Spent: 1h (was: 50m) > Update fileio.MatchContinuously to allow reading already read files with a > new timestamp > ---------------------------------------------------------------------------------------- > > Key: BEAM-14315 > URL: https://issues.apache.org/jira/browse/BEAM-14315 > Project: Beam > Issue Type: New Feature > Components: io-py-common > Reporter: Yi Hu > Assignee: Yi Hu > Priority: P2 > Time Spent: 1h > Remaining Estimate: 0h > > This will be the Python counterpart of BEAM-14267. > For fileio.MatchContinuously, we want to add an option to choose to consider > a file new if it has a different timestamp from an existing file, even if the > file itself has the same name. > See the following design doc for more detail: > https://docs.google.com/document/d/1xnacyLGNh6rbPGgTAh5D1gZVR8rHUBsMMRV3YkvlL08/edit?usp=sharing&resourcekey=0-be0uF-DdmwAz6Vg4Li9FNw -- This message was sent by Atlassian Jira (v8.20.7#820007)