> although I think only using a customizable shuffle won't address the generation of small files. One assumption is that at least the sink generates one file per subtask, which can already be too many. Another problem is that with low checkpointing intervals, the files do not meet the required size. The latter point is probably addressable by changing the checkpoint interval, which might be inconvenient for some users.
Agree. I didn't mean that shuffling can solve all the problems of small files. I was just trying to use it as an example. You touched a few other causes that maybe we can discuss separately. 1. one file per subtask is already too many. Should we reduce the parallelism for the writer operator? 2. low checkpoint intervals. How does the proposal address this cause? smaller number of compactor tasks read and compact files? would it be the same as lowering the parallelism of the upstream writer operator? I am not trying to argue against the needs of compaction. Just try to understand the different scenarios and see how the proposal helps > the benefits of having a coordinator in comparison to a global committer/aggregator operator. One benefit is the potential of maintaining embarrassingly parallel DAG (like source -> sink) where region failover only needs to recover a small region when one TM node died. Whether this is a big benefit or not is certainly up to debate > Unfortunately, the downside is that you have to your data after it is already available for other downstream consumers. I guess this can lead to all kinds of visibility problems. Yes, rewriting data can have visibility problems for non-transactional sinks. If we are going to compact files before commit. Why not shuffle or reduce parallelism in the first place? would it achieve a similar goal? Otherwise, we are involving writing a bunch of files, planning compaction, reading all small files in, and writing all data out to a smaller number of files. File read-write/upload is probably more expensive than just network shuffle. For transactional sinks like Iceberg, this is not a concern. It is good to make data available ASAP. Compaction can happen after the commit (in the same Flink streaming job or a separate batch maintenance job) Thanks, Steven On Mon, Nov 8, 2021 at 3:59 AM Fabian Paul <fabianp...@ververica.com> wrote: > Hi all, > > Thanks for the lively discussions. I am really excited to see so many > people > participating in this thread. It also underlines the need that many people > would > like to see a solution soon. > > I have updated the FLIP and removed the parallelism configuration because > it is > unnecessary since users can configure a constant exchange key to send all > committables to only one committable aggregator. > > > 1. Burden for developers w.r.t batch stream unification. > > @yun @guowei, from a theoretical point you are right about exposing the > DataStream > API in the sink users have the full power to write correct batch and > streaming > sinks. I think in reality a lot of users still struggle to build pipelines > with > i.e. the operator pipeline which works correct in streaming and batch mode. > Another problem I see is by exposing more deeper concepts is that we > cannot do > any optimization because we cannot reason about how sinks are built in the > future. > > We should also try to steer users towards using only `Functions` to give > us more > flexibility to swap the internal operator representation. I agree with > @yun we > should try to make the `ProcessFunction` more versatile to work on that > goal but > I see this as unrelated to the FLIP. > > > 2. Regarding Commit / Global commit > > I envision the global committer to be specific depending on the data lake > solution you want to write to. However, it is entirely orthogonal to the > compaction. > Currently, I do not expect any changes w.r.t the Global commit introduces > by > this FLIP. > > > 3. Regarding the case of trans-checkpoints merging > > @yun, as user, I would expect that if the committer receives in a > checkpoint files > to merge/commit that these are also finished when the checkpoint finishes. > I think all sinks rely on this principle currently i.e., KafkaSink needs to > commit all open transactions until the next checkpoint can happen. > > Maybe in the future, we can somehow move the Committer#commit call to an > asynchronous execution, but we should discuss it as a separate thread. > > > We probably should first describe the different causes of small files and > > what problems was this proposal trying to solve. I wrote a data shuffling > > proposal [1] for Flink Iceberg sink (shared with Iceberg community [2]). > It > > can address small files problems due to skewed data distribution across > > Iceberg table partitions. Streaming shuffling before writers (to files) > is > > typically more efficient than post-write file compaction (which involves > > read-merge-write). It is usually cheaper to prevent a problem (small > files) > > than fixing it. > > > @steven you are raising a good point, although I think only using a > customizable > shuffle won't address the generation of small files. One assumption is that > at least the sink generates one file per subtask, which can already be too > many. > Another problem is that with low checkpointing intervals, the files do not > meet > the required size. The latter point is probably addressable by changing the > checkpoint interval, which might be inconvenient for some users. > > > The sink coordinator checkpoint problem (mentioned in option 1) would be > > great if Flink can address it. In the spirit of source > (enumerator-reader) > > and sink (writer-coordinator) duality, sink coordinator checkpoint should > > happen after the writer operator. This would be a natural fit to support > > global committer in FLIP-143. It is probably an orthogonal matter to this > > proposal. > > > To me the question here is what are the benefits of having a coordinator in > comparison to a global committer/aggregator operator. > > > Personally, I am usually in favor of keeping streaming ingestion (to data > > lake) relatively simple and stable. Also sometimes compaction and sorting > > are performed together in data rewrite maintenance jobs to improve read > > performance. In that case, the value of compacting (in Flink streaming > > ingestion) diminishes. > > > I agree it is always possible to have scheduled maintenance jobs keeping > care of > your data i.e., doing compaction. Unfortunately, the downside is that you > have to your data after it is already available for other downstream > consumers. > I guess this can lead to all kinds of visibility problems. I am also > surprised that > you personally are a fan of this approach and, on the other hand, are > developing > the Iceberg sink, which goes somewhat against your mentioned principle of > keeping > the sink simple. > > > Currently, it is unclear from the doc and this thread where the > compaction > > is actually happening. Jingsong's reply described one model > > writer (parallel) -> aggregator (single-parallelism compaction planner) > -> > > compactor (parallel) -> global committer (single-parallelism) > > > My idea of the topology is very similar to the one outlined by Jinsong. The > compaction will happen in the committer operator. > > > > > In the Iceberg community, the following model has been discussed. It is > > better for Iceberg because it won't delay the data availability. > > writer (parallel) -> global committer for append (single parallelism) -> > > compactor (parallel) -> global committer for rewrite commit (single > > parallelism) > > > From a quick glimpse, it seems that the exact same topology is possible to > express with the committable aggregator, but this definitely depends on > the exact > setup. > > Best, > Fabian