Re: Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2022-01-03 Thread Yun Gao
:21 2022 Recipients:dev , Yun Gao Subject:Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction Hi Yun, Thanks for summarizing the two issues. 1. Losing intermediate shuffle data in batch mode I fully agree with your analysis. We will start to mitigate

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2022-01-03 Thread Fabian Paul
itter topology until we have > further determined the > semantics and solutions ? > > Best, > Yun > > > > -------------- > From:Arvid Heise > Send Time:2021 Dec. 16 (Thu.) 21:48 > To:Yun Gao > Cc:dev > Subject:Re: Re: [DISCUSS] FLIP-191: Extend unified Sink

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-21 Thread Yun Gao
Send Time:2021 Dec. 16 (Thu.) 21:48 To:Yun Gao Cc:dev Subject:Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction I just noticed offline with Yun that I have some misconception on how how blocking data exchange work in Flink. Apparently, a subtask of the

?????? [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread GALENO
--  -- ??: "dev" https://cwiki.apache.

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Arvid Heise
cords across the two runs, then the two sinks >> would produces >> different data, which might be not suitable in some cases. Perhaps we >> need some support >> from the scheduler side? >> >> But I also agree this could be a separate issue and we could solve it >> separately

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Arvid Heise
be a subclass of > > > > Sink v2 and extends the stateful and two-phase-commit sinks, right? > > > > > > > > 3. I'd like also have a confirmation on ours thoughts with the > > > `DataStreamSink` returned by the sinkTo method: > > > > The main issue is how d

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Yun Gao
un Gao Subject:Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction Just a quick amend: There will be no blocking exchange in the pre-writer exchange for performance reasons. After the writer, we have tiny data volume and are free to add as many as we see nece

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Arvid Heise
how do we implement the method like > `setParallelism` > > > or `setMaxParallelism` since now the sink > > > > would be translated to multiple transformations? perhaps we could > make > > > it the default values for all the transformations > &

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Fabian Paul
e transformations? perhaps we could make > > it the default values for all the transformations > > > for the sink? A related issue would be for iceberg sink, I think it > > would need to have only one committer to avoid the > > > competition of the optimistic locks (which would cause perfor

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Till Rohrmann
ers > > with 1 committers, to build such topology, perhaps we might need to add > new methods to specify the parallelism of > > the writers and committers separately? > > > > Best, > > Yun > > > > > > --Original Mail --

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-16 Thread Fabian Paul
topology, perhaps we might need to add new > methods to specify the parallelism of > the writers and committers separately? > > Best, > Yun > > > ------Original Mail ------ > Sender:Fabian Paul > Send Date:Mon Dec 13 23:59:43 2021 > Recipients:dev

Re: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-15 Thread Yun Gao
iginal Mail -- Sender:Fabian Paul Send Date:Mon Dec 13 23:59:43 2021 Recipients:dev Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction Hi all, After a lot of discussions with different, we received very fruitful feedback and reworke

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-13 Thread Fabian Paul
would buffer input elements in a temporary storage > >> > (e.g. local file) until a threshold is reached; after that, it would > >> > invoke the original SinkWriter. And if a checkpoint barrier comes in > >> > earlier, it would send written data to some aggregator. > >> > >> I think perhaps this seems to be a ki

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-12-02 Thread Roman Khachatryan
int barrier comes in >> > earlier, it would send written data to some aggregator. >> >> I think perhaps this seems to be a kind of WAL method? Namely we first >> write the elements to some WAL logs and persist them on checkpoint >> (in snapshot or remote FS), or we directly write WAL logs to the remote >> FS eagerly. >> >

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-29 Thread Arvid Heise
ctly write WAL logs to the remote > FS eagerly. > > Sorry if I do not understand correctly somewhere. > > Best, > Yun > > > -- > From:Roman Khachatryan > Send Time:2021 Nov. 9 (Tue.) 22:03 > To:de

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-28 Thread Yun Gao
napshot or remote FS), or we directly write WAL logs to the remote FS eagerly. Sorry if I do not understand correctly somewhere. Best, Yun -- From:Roman Khachatryan Send Time:2021 Nov. 9 (Tue.) 22:03 To:dev Subject:Re: [DISCUSS] FLIP

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-09 Thread Roman Khachatryan
Hi everyone, Thanks for the proposal and the discussion, I have some remarks: (I'm not very familiar with the new Sink API but I thought about the same problem in context of the changelog state backend) 1. Merging artifacts from multiple checkpoints would apparently require multiple concurrent ch

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-08 Thread Reo Lei
Hi Fabian, Thanks for drafting the FLIP and trying to support small file compaction. I think this feature is very urgent and valuable for users(at least for me). Currently I am trying to support streaming rewrite(compact) for Iceberg on PR#3323 . As St

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-08 Thread Yun Gao
pull/1 [2] https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/ ---------- From:Steven Wu Send Time:2021 Nov. 9 (Tue.) 06:52 To:dev Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-08 Thread Steven Wu
> although I think only using a customizable shuffle won't address the generation of small files. One assumption is that at least the sink generates one file per subtask, which can already be too many. Another problem is that with low checkpointing intervals, the files do not meet the required siz

RE: Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-08 Thread Reo Lei
his case when we designed > > the new API. But as a whole, from another view, I think perhaps writing a > > stream / batch unified program with the DataStream API should not be that > > hard? It does not increase more difficulty compared to writing a normal > > stream / batch unifie

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-08 Thread Fabian Paul
Hi all, Thanks for the lively discussions. I am really excited to see so many people participating in this thread. It also underlines the need that many people would like to see a solution soon. I have updated the FLIP and removed the parallelism configuration because it is unnecessary since user

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-06 Thread Steven Wu
fic issues we mentioned, I > think > based on the previous discussion, we should finally add `finish()` to the > UDF, and for now I think we could at least first add it to family of > `ProcessFunction`. > > Best, > Yun > > > > ---------- > Fr

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-04 Thread Yun Gao
me:2021 Nov. 4 (Thu.) 16:55 To:Till Rohrmann Cc:dev ; "David Morávek" Subject:Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction > > Emitting records for downstream operators in or after > notifyCheckpointComplete no longer works after FLIP-147

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-04 Thread Arvid Heise
> > Emitting records for downstream operators in or after > notifyCheckpointComplete no longer works after FLIP-147 when executing the > final checkpoint. The problem is that the final checkpoint happens after > the EOI and we would like to keep the property that you can terminate the > whole topol

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-04 Thread Till Rohrmann
Thanks for the detailed description Arvid. I might misunderstand things but one comment concerning: | We could even optimize the writer to only emit the committables after notifyCheckpointComplete as long as we retain them in the state of the writer. Emitting records for downstream operators in

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-04 Thread Arvid Heise
Hi folks, thanks for the lively discussion. Let me present my point of view on a couple of items: *Impact on checkpointing times* Currently, we send the committables of the writer downstream before the barrier is sent. That allows us to include all committables in the state of the committers, su

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-03 Thread Guowei Ma
Hi, Very thanks Fabian for drafting this FLIP! It looks very good to me. I see currently most of us agree with option 2, but I personally feel that option 3 may be better :-) I have some small concerns for option 2 1. Developers understand that the cost is relatively high. The merging of small fi

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-03 Thread Jingsong Li
Hi Fabian, Thanks for drafting the FLIP! ## Few thoughts of user requirements 1.compact files from multiple checkpoints This is what users need very much. 2.The compaction block the checkpointing - Some scenarios are required. For example, the user expects the output data to be consistent wit

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-03 Thread Fabian Paul
Hi David and Till, Thanks for your great feedback. One definitely confusing point in the FLIP is who is doing the actual compaction. The compaction will not be done by the CommittableAggregator operator but the committers so it should also not affect the checkpointing duration or have a signif

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-03 Thread Till Rohrmann
Ideally, the compaction won't affect the checkpointing time. If the compaction takes longer to complete, then the result could be published with the next checkpoint. Of course, this would increase the end-to-end latency. I hope that with option 2, we can support both use cases: single task compact

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-03 Thread David Morávek
Hi Fabian, thanks for drafting the FLIP! This is a really nice and useful topic to target ;) Few thoughts on the option 2) The file compaction is by definition quite costly IO bound operation. If I understand the proposal correctly, the aggregation itself would run during operator (aggregator) ch

Re: [DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-03 Thread Till Rohrmann
Thanks for creating this FLIP Fabian. >From your description I would be in favour of option 2 for the following reasons: Assuming that option 2 solves all our current problems, it seems like the least invasive change and smallest in scope. Your main concern is that it might not cover future use ca

[DISCUSS] FLIP-191: Extend unified Sink interface to support small file compaction

2021-11-02 Thread Fabian Paul
Hi all, More and more data lake sinks rely on columnar formats which benefit from few larger files than a lot of small files (read amplification). Our current FileSink cannot ensure a certain size when writing to an external filesystem which I call the small file compaction problem. Unfortunat