HI Jungtaek,

I have a question, aren't both approaches compatible?

How I see it, I think It would be interesting to have a retention period to
delete old files and/or the possibility of indicating an offset
(Timestamp). It would be very "similar" to how we do it with kafka.

WDYT?

On Thu, 30 Jul 2020 at 23:51, Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> (I'd like to keep the discussion thread focusing on the specific topic -
> let's initiate another discussion threads on different topics.)
>
> Thanks for the input. I'd like to emphasize that the point in discussion
> is the "latestFirst" option - the rationalization starts from
> growing metadata log issues. I hope your input is picking option 2, but
> could you please make clear your input represents OK to "replace" the
> "latestFirst" option with "starting from timestamp"?
>
>
> On Thu, Jul 30, 2020 at 4:48 PM vikram agrawal <vikram.agra...@gmail.com>
> wrote:
>
>> If we compare file-stream source with other streaming sources such as
>> Kafka, the current behavior is indeed incomplete.  Starting the streaming
>> from a custom offset/particular point of time is something that is missing.
>> Typically filestream sources don't have auto-deletion of the older
>> data/files. In kafka we can define the retention period. So even if we use
>> "Earliest" we won't end up reading from the time when the Kafka topic was
>> created. On the other hand, streaming sources can hold very old files. It's
>> very valid use-cases to read the bulk of the old files using a batch job
>> until a particular timestamp. And then use streaming jobs for real-time
>> updates.
>>
>> So having support where we can specify a timestamp. and we would consider
>> files created post that timestamp can be useful.
>>
>> Another concern which we need to consider is the listing cost. is there
>> any way we can avoid listing the entire base directory and then filtering
>> out the new files. if the data is organized as partitions using date, will
>> it help to list only those partitions where new files were added?
>>
>>
>> On Thu, Jul 30, 2020 at 11:22 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> bump, is there any interest on this topic?
>>>
>>> On Mon, Jul 20, 2020 at 6:21 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
>>>> (Just to add rationalization, you can refer the original mail thread on
>>>> dev@ list to see efforts on addressing problems in file stream source
>>>> / sink -
>>>> https://lists.apache.org/thread.html/r1cd548be1cbae91c67e5254adc0404a99a23930f8a6fde810b987285%40%3Cdev.spark.apache.org%3E
>>>> )
>>>>
>>>> On Mon, Jul 20, 2020 at 6:18 AM Jungtaek Lim <
>>>> kabhwan.opensou...@gmail.com> wrote:
>>>>
>>>>> Hi devs,
>>>>>
>>>>> As I have been going through the various issues on metadata log
>>>>> growing, it's not only the issue of sink, but also the issue of source.
>>>>> Unlike sink metadata log which entries should be available to the
>>>>> readers, the source metadata log is only for the streaming query starting
>>>>> from the checkpoint, hence in theory it should only memorize about
>>>>> minimal entries which prevent processing multiple times on the same file.
>>>>>
>>>>> This is not applied to the file stream source, and I think it's
>>>>> because of the existence of the "latestFirst" option which I haven't seen
>>>>> from any sources. The option works as reading files in "backward" order,
>>>>> which means Spark can read the oldest file and latest file together in a
>>>>> micro-batch, which ends up having to memorize all files previously read.
>>>>> The option can be changed during query restart, so even if the query is
>>>>> started with "latestFirst" being false, it's not safe to apply the logic 
>>>>> of
>>>>> minimizing entries to memorize, as the option can be changed to true and
>>>>> then we'll read files again.
>>>>>
>>>>> I'm seeing two approaches here:
>>>>>
>>>>> 1) apply "retention" - unlike "maxFileAge", the option would apply to
>>>>> latestFirst as well. That said, if the retention is set to 7 days, the
>>>>> files older than 7 days would never be read in any way. With this approach
>>>>> we can at least get rid of entries which are older than retention. The
>>>>> issue is how to play nicely with existing "maxFileAge", as it also plays
>>>>> similar with the retention, though it's being ignored when latestFirst is
>>>>> turned on. (Change the semantic of "maxFileAge" vs leave it to "soft
>>>>> retention" and introduce another option.)
>>>>>
>>>>> (This approach is being proposed under SPARK-17604, and PR is
>>>>> available - https://github.com/apache/spark/pull/28422)
>>>>>
>>>>> 2) replace "latestFirst" option with alternatives, which no longer
>>>>> read in "backward" order - this doesn't say we have to read all files to
>>>>> move forward. As we do with Kafka, start offset can be provided, ideally 
>>>>> as
>>>>> a timestamp, which Spark will read from such timestamp and forward order.
>>>>> This doesn't cover all use cases of "latestFirst", but "latestFirst"
>>>>> doesn't seem to be natural with the concept of SS (think about watermark),
>>>>> I'd prefer to support alternatives instead of struggling with 
>>>>> "latestFirst".
>>>>>
>>>>> Would like to hear your opinions.
>>>>>
>>>>> Thanks,
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>

Reply via email to