HI Jungtaek, I have a question, aren't both approaches compatible?
How I see it, I think It would be interesting to have a retention period to delete old files and/or the possibility of indicating an offset (Timestamp). It would be very "similar" to how we do it with kafka. WDYT? On Thu, 30 Jul 2020 at 23:51, Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote: > (I'd like to keep the discussion thread focusing on the specific topic - > let's initiate another discussion threads on different topics.) > > Thanks for the input. I'd like to emphasize that the point in discussion > is the "latestFirst" option - the rationalization starts from > growing metadata log issues. I hope your input is picking option 2, but > could you please make clear your input represents OK to "replace" the > "latestFirst" option with "starting from timestamp"? > > > On Thu, Jul 30, 2020 at 4:48 PM vikram agrawal <vikram.agra...@gmail.com> > wrote: > >> If we compare file-stream source with other streaming sources such as >> Kafka, the current behavior is indeed incomplete. Starting the streaming >> from a custom offset/particular point of time is something that is missing. >> Typically filestream sources don't have auto-deletion of the older >> data/files. In kafka we can define the retention period. So even if we use >> "Earliest" we won't end up reading from the time when the Kafka topic was >> created. On the other hand, streaming sources can hold very old files. It's >> very valid use-cases to read the bulk of the old files using a batch job >> until a particular timestamp. And then use streaming jobs for real-time >> updates. >> >> So having support where we can specify a timestamp. and we would consider >> files created post that timestamp can be useful. >> >> Another concern which we need to consider is the listing cost. is there >> any way we can avoid listing the entire base directory and then filtering >> out the new files. if the data is organized as partitions using date, will >> it help to list only those partitions where new files were added? >> >> >> On Thu, Jul 30, 2020 at 11:22 AM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> bump, is there any interest on this topic? >>> >>> On Mon, Jul 20, 2020 at 6:21 AM Jungtaek Lim < >>> kabhwan.opensou...@gmail.com> wrote: >>> >>>> (Just to add rationalization, you can refer the original mail thread on >>>> dev@ list to see efforts on addressing problems in file stream source >>>> / sink - >>>> https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E >>>> ) >>>> >>>> On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim < >>>> kabhwan.opensou...@gmail.com> wrote: >>>> >>>>> Hi devs, >>>>> >>>>> As I have been going through the various issues on metadata log >>>>> growing, it's not only the issue of sink, but also the issue of source. >>>>> Unlike sink metadata log which entries should be available to the >>>>> readers, the source metadata log is only for the streaming query starting >>>>> from the checkpoint, hence in theory it should only memorize about >>>>> minimal entries which prevent processing multiple times on the same file. >>>>> >>>>> This is not applied to the file stream source, and I think it's >>>>> because of the existence of the "latestFirst" option which I haven't seen >>>>> from any sources. The option works as reading files in "backward" order, >>>>> which means Spark can read the oldest file and latest file together in a >>>>> micro-batch, which ends up having to memorize all files previously read. >>>>> The option can be changed during query restart, so even if the query is >>>>> started with "latestFirst" being false, it's not safe to apply the logic >>>>> of >>>>> minimizing entries to memorize, as the option can be changed to true and >>>>> then we'll read files again. >>>>> >>>>> I'm seeing two approaches here: >>>>> >>>>> 1) apply "retention" - unlike "maxFileAge", the option would apply to >>>>> latestFirst as well. That said, if the retention is set to 7 days, the >>>>> files older than 7 days would never be read in any way. With this approach >>>>> we can at least get rid of entries which are older than retention. The >>>>> issue is how to play nicely with existing "maxFileAge", as it also plays >>>>> similar with the retention, though it's being ignored when latestFirst is >>>>> turned on. (Change the semantic of "maxFileAge" vs leave it to "soft >>>>> retention" and introduce another option.) >>>>> >>>>> (This approach is being proposed under SPARK-17604, and PR is >>>>> available - https://github.com/apache/spark/pull/28422) >>>>> >>>>> 2) replace "latestFirst" option with alternatives, which no longer >>>>> read in "backward" order - this doesn't say we have to read all files to >>>>> move forward. As we do with Kafka, start offset can be provided, ideally >>>>> as >>>>> a timestamp, which Spark will read from such timestamp and forward order. >>>>> This doesn't cover all use cases of "latestFirst", but "latestFirst" >>>>> doesn't seem to be natural with the concept of SS (think about watermark), >>>>> I'd prefer to support alternatives instead of struggling with >>>>> "latestFirst". >>>>> >>>>> Would like to hear your opinions. >>>>> >>>>> Thanks, >>>>> Jungtaek Lim (HeartSaVioR) >>>>> >>>>