[ https://issues.apache.org/jira/browse/FLINK-9749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16935217#comment-16935217 ]
Shimin Yang edited comment on FLINK-9749 at 9/22/19 5:41 AM: ------------------------------------------------------------- [~kkl0u], I understand your concern. However, if we use event-time based bucket assigner, we do not to introduce new mechanism. Although the bucket is related to event time, it's just a special bucket which can take advantage of timestamp and watermark info in the BucketAssigner.Context. The exactly-once still need to be guaranteed by Flink. But instead of buffering all records in keyed state, using event time bucket assigner only need to track the status of bucket status with operator state. And we still need to use StreamingFileSink to write to buckets(bucketID is get from window) after messages processed by window operator. Finally, I would like to explain why I bring up this discussion. Users want to using flink to do streaming ETL from Kafka to Hive based on event time and automatically load partition to Hive partitioned table, which can eventually replace the batch ETL pipeline. The number of records is huge since it's all raw log message, using window would be too much inefficient. So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism quite similar to watermark to decide the timing to load partition to Hive. We also support more complex partition like combining date time and key. Now we want to refactor it using StreamingFileSink. was (Author: dangdangdang): [~kkl0u], I understand your concern. However, if we use event-time based bucket assigner, we do not to introduce new mechanism. Although the bucket is related to event time, it's just a special bucket which can take advantage of timestamp and watermark info in the BucketAssigner.Context. The exactly-once still need to be guaranteed by Flink. But instead of buffering all records in keyed state, using event time bucket assigner only need to track the status of bucket status with operator state. And we still need to use StreamingFileSink to write to buckets(bucketID is get from window) after messages processed by window operator. Finally, I would like to explain why I bring up this discussion. Users want to using flink to do streaming ETL from Kafka to Hive based on event time and automatically load to Hive partitioned table, which can eventually replace the batch ETL pipeline. The number of records is huge since it's all raw log message, using window would be too much inefficient. So we developed a Hive Streaming Sink based on 2PC Sink and use a mechanism quite similar to watermark to decide the timing to load partition to Hive. We also support more complex partition like combining date time and key. Now we want to refactor it using StreamingFileSink. > Rework Bucketing Sink > --------------------- > > Key: FLINK-9749 > URL: https://issues.apache.org/jira/browse/FLINK-9749 > Project: Flink > Issue Type: New Feature > Components: Connectors / FileSystem > Reporter: Stephan Ewen > Assignee: Kostas Kloudas > Priority: Major > > The BucketingSink has a series of deficits at the moment. > Due to the long list of issues, I would suggest to add a new > StreamingFileSink with a new and cleaner design > h3. Encoders, Parquet, ORC > - It only efficiently supports row-wise data formats (avro, jso, sequence > files. > - Efforts to add (columnar) compression for blocks of data is inefficient, > because blocks cannot span checkpoints due to persistence-on-checkpoint. > - The encoders are part of the \{{flink-connector-filesystem project}}, > rather than in orthogonal formats projects. This blows up the dependencies of > the \{{flink-connector-filesystem project}} project. As an example, the > rolling file sink has dependencies on Hadoop and Avro, which messes up > dependency management. > h3. Use of FileSystems > - The BucketingSink works only on Hadoop's FileSystem abstraction not > support Flink's own FileSystem abstraction and cannot work with the packaged > S3, maprfs, and swift file systems > - The sink hence needs Hadoop as a dependency > - The sink relies on "trying out" whether truncation works, which requires > write access to the users working directory > - The sink relies on enumerating and counting files, rather than maintaining > its own state, making less efficient > h3. Correctness and Efficiency on S3 > - The BucketingSink relies on strong consistency in the file enumeration, > hence may work incorrectly on S3. > - The BucketingSink relies on persisting streams at intermediate points. > This is not working properly on S3, hence there may be data loss on S3. > h3. .valid-length companion file > - The valid length file makes it hard for consumers of the data and should > be dropped > We track this design in a series of sub issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)