Hi Kurt, +1 for having some offline discussion on this topic.
But I think the question about using StreamingFileSink or implementing subset of it’s feature from scratch is quite fundamental design decision, with impact on the behaviour of Public API, so I wouldn’t discard it as technical detail and should be included as part of the FLIP (I know It could be argued in opposite direction). Piotrek > On 18 Mar 2020, at 13:55, Kurt Young <ykt...@gmail.com> wrote: > > Hi all, > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply the > implementation > of such connector yet, it only describes the functionality and expected > behaviors from user's > perspective. Reusing current StreamingFileSink is definitely one of the > possible ways to > implement it. Since there are lots of details and I would suggest we can > have an offline meeting > to discuss the how these could be achieved by extending StremingFileSink, > and how much > effort we need to put on it. What do you think? > > Best, > Kurt > > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <kklou...@gmail.com> wrote: > >> Hi all, >> >> I also agree with Stephan on this! >> >> It has been more than a year now that most of our efforts have had the >> "unify" / "unification"/ etc either on their title or in their core >> and this has been the focus of all our resources. By deviating from >> this now, we only put more stress on other teams in the future. When >> the users start using a given API, with high probability, they will >> ask (and it is totally reasonable) consistent behaviour from all the >> other APIs that ship with Flink. This will eventually lead to having >> to answer the questions that we now deem as difficult in a future >> release, when we will have to "unify" again. >> >> In addition, Hive integration is definitely a "nice to have" feature >> but it does not mean that we need to push for 100% compatibility if it >> is not required. >> @Jingsong Li if you think that Parquet and Orc are the main formats, >> we can focus on these and provide good support for them (both reading >> and writing). For maintainability, I think that given the amount of >> demand for these formats, it is not going to be a huge problem at >> least for now. >> >> Given the above, I am also leaning towards a solution that aims at >> extending the StreamingFileSink to efficiently support bulk formats >> like Parquet and Orc, rather than creating a new sink that locks >> Hive-dependent usecases to a specific API. >> >> Cheers, >> Kostas >> >> >> >> >> >> On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <se...@apache.org> wrote: >>> >>>>> The FLIP is "Filesystem connector in Table", it's about building up >>> Flink Table's capabilities. >>> >>> That is exactly what worries me. The whole effort is not thinking about >>> Flink as a whole any more. >>> This proposal is not trying to build a consistent user experience across >>> batch and streaming, across Table API, SQL, and DataStream. >>> >>> The proposal is building a separate, disconnected ecosystem for the Table >>> API, specific to batch processing and some limited streaming setups. >>> Specific to one type of environment (Hive and HDFS). It willingly omits >>> support for other environments and conflicts with efforts in other >>> components to unify. >>> >>> Supporting common use cases is good, but in my opinion not at the price >> of >>> creating a "fractured" project where the approaches in different layers >>> don't fit together any more. >>> >>> >>>> *## StreamingFileSink not support writer with path* >>>> >>>> The FLIP is "Filesystem connector in Table", it's about building up >> Flink >>>> Table's capabilities. But I think Hive is important, I see that most >> users >>>> use Flink and Spark to write data from Kafka to hive. Streaming >> writing, I >>>> see that these two engines are convenient and popular. I mean, Flink >> is not >>>> only a hive runtime, but also an important part of offline data >> warehouse. >>>> The thing is StreamingFileSink not support hadoop record writers. Yes, >> we >>>> can support them one by one. I see the community integrating ORC [1]. >> But >>>> it's really not an easy thing. And we have to be careful to maintain >>>> compatibility. After all, users downstream use other computing engines >> to >>>> analyze. >>>> Yes, exposing "RecoverableFsDataOutputStream" to writers is good to >>>> subsequent optimization [2]. But there are many cases. It is enough for >>>> users to generate new files at the checkpoint. They pay more attention >> to >>>> whether they can do it and whether there is a risk of compatibility. >>>> Therefore, RecordWriter is used here. >>>> >>>> *## External HDFS access* >>>> >>>> Including hadoop configuration and Kerberos related things. >>>> >>>> *## Partition commit* >>>> >>>> Committing a partition is to notify the downstream application that the >>>> partition has finished writing, the partition is ready to be read.The >>>> common way is to add a success file or update metastore. Of course, >> there >>>> are other ways to notify. We need to provide flexible mechanisms. >>>> As you mentioned, yes, we can extend "StreamingFileSink" for this part. >>>> >>>> *## Batch / Streaming Unification* >>>> >>>> Yes, it is about exactly-once and single commit at the end, There are >> also >>>> some "bounded" differences. For example, batch can support sorting. In >> this >>>> way, you can sort by partition, which can reduce the number of writers >>>> written at the same time. Dynamic partition writing in batch may >> produce >>>> many unordered partitions. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-10114 >>>> [2] https://issues.apache.org/jira/browse/FLINK-11499 >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> On Tue, Mar 17, 2020 at 8:00 PM LakeShen <shenleifight...@gmail.com> >>>> wrote: >>>> >>>>> Hi Jingsong , >>>>> >>>>> I am looking forward this feature. Because in some streaming >>>> application,it >>>>> need transfer their messages to hdfs , in order to offline analysis. >>>>> >>>>> Best wishes, >>>>> LakeShen >>>>> >>>>> Stephan Ewen <se...@apache.org> 于2020年3月17日周二 下午7:42写道: >>>>> >>>>>> I would really like to see us converging the stack and the >>>> functionality >>>>>> here. >>>>>> Meaning to try and use the same sinks in the Table API as for the >>>>>> DataStream API, and using the same sink for batch and streaming. >>>>>> >>>>>> The StreamingFileSink has a lot of things that can help with that. >> If >>>>>> possible, it would be nice to extend it (which would help move >> towards >>>>> the >>>>>> above goal) rather than build a second sink. Building a second sink >>>> leads >>>>>> us further away from unification. >>>>>> >>>>>> I am a bit puzzled by the statement that sinks are primarily for >> Hive. >>>>> The >>>>>> Table API should not be coupled to Hive, it should be an >> independent >>>>>> batch/streaming API for many use cases, supporting very well for >> batch >>>>> and >>>>>> streaming interplay. Supporting Hive is great, but we should not be >>>>>> building this towards Hive, as just yet another Hive runtime. Why >> "yet >>>>>> another Hive runtime" when what we have a unique streaming engine >> that >>>>> can >>>>>> do much more? We would drop our own strength and reduce ourselves >> to a >>>>>> limited subset. >>>>>> >>>>>> Let's build a File Sink that can also support Hive, but can do so >> much >>>>>> more. For example, efficient streaming file ingestion as >> materialized >>>>> views >>>>>> from changelogs. >>>>>> >>>>>> >>>>>> *## Writing Files in Streaming* >>>>>> >>>>>> To write files in streaming, I don't see another way than using the >>>>>> streaming file sink. If you want to write files across checkpoints, >>>>> support >>>>>> exactly-once, and support consistent "stop with savepoint", it is >> not >>>>>> trivial. >>>>>> >>>>>> A part of the complexity comes from the fact that not all targets >> are >>>>>> actually file systems, and not all have simple semantics for >>>> persistence. >>>>>> S3 for example does not support renames (only copies, which may >> take a >>>>> lot >>>>>> of time) and it does not support flush/sync of data (the S3 file >> system >>>>> in >>>>>> Hadoop exposes that but it does not work. flush/sync, followed by a >>>>>> failure, leads to data loss). You need to devise a separate >> protocol >>>> for >>>>>> that, which is exactly what has already been done and abstracted >> behind >>>>> the >>>>>> recoverable writers. >>>>>> >>>>>> If you re-engineer that in the, you will end up either missing many >>>>> things >>>>>> (intermediate persistence on different file systems, and atomic >> commit >>>> in >>>>>> the absence of renames, etc.), or you end up doing something >> similar as >>>>> the >>>>>> recoverable writers do. >>>>>> >>>>>> >>>>>> *## Atomic Commit in Batch* >>>>>> >>>>>> For batch sinks, it is also desirable to write the data first and >> then >>>>>> atomically commit it once the job is done. >>>>>> Hadoop has spent a lot of time making this work, see this doc here, >>>>>> specifically the section on 'The "Magic" Committer'. [1] >>>>>> >>>>>> What Flink has built in the RecoverableWriter is in some way an >> even >>>>> better >>>>>> version of this, because it works without extra files (we pass data >>>>> through >>>>>> checkpoint state) and it supports not only committing once at the >> end, >>>>> but >>>>>> committing multiple time intermediate parts during checkpoints. >>>>>> >>>>>> Meaning using the recoverable writer mechanism in batch would >> allow us >>>> to >>>>>> immediately get the efficient atomic commit implementations on >> file:// >>>>>> hdfs:// and s3://, with a well defined way to implement it also for >>>> other >>>>>> file systems. >>>>>> >>>>>> >>>>>> *## Batch / Streaming Unification* >>>>>> >>>>>> It would be great to start looking at these things in the same way: >>>>>> - streaming (exactly-once): commits files (after finished) at the >>>> next >>>>>> checkpoint >>>>>> - batch: single commit at the end of the job >>>>>> >>>>>> >>>>>> *## DataStream / Table API Stack Unification* >>>>>> >>>>>> Having the same set of capabilities would make it much easier for >> users >>>>> to >>>>>> understand the system. >>>>>> Especially when it comes to consistent behavior across external >>>> systems. >>>>>> Having a different file sink in Table API and DataStream API means >> that >>>>>> DataStream can write correctly to S3 while Table API cannot. >>>>>> >>>>>> >>>>>> *## What is missing?* >>>>>> >>>>>> It seems there are some things that get in the way of naturally >>>>>> Can you make a list of what features are missing in the >>>> StreamingFileSink >>>>>> that make it usable for the use cases you have in mind? >>>>>> >>>>>> Best, >>>>>> Stephan >>>>>> >>>>>> [1] >>>>>> >>>>>> >>>>> >>>> >> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html >>>>>> >>>>>> >>>>>> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li < >> jingsongl...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Piotr, >>>>>>> >>>>>>> I am very entangled. >>>>>>> >>>>>>> Let me re-list the table streaming sink requirements: >>>>>>> - In table, maybe 90% sinks are for Hive. The parquet and orc >> are the >>>>>> most >>>>>>> important formats. Hive provide RecordWriters, it is easy to >> support >>>>> all >>>>>>> hive formats by using it, and we don't need concern hive version >>>>>>> compatibility too, but it can not work with FSDataOutputStream. >>>>>>> - Hive table maybe use external HDFS. It means, hive has its own >>>> hadoop >>>>>>> configuration. >>>>>>> - In table, partition commit is needed, we can not just move >> files, >>>> it >>>>> is >>>>>>> important to complete table semantics to update catalog. >>>>>>> >>>>>>> You are right DataStream and Table streaming sink will not be >> fully >>>>>>> compatible, each with its own set of limitations, quirks and >>>> features. >>>>>>> But if re-using DataStream, batch and streaming also will not be >>>> fully >>>>>>> compatible. Provide a unify experience to batch and streaming is >> also >>>>>>> important. >>>>>>> >>>>>>> Table and DataStream have different concerns, and they tilt in >>>>> different >>>>>>> directions. >>>>>>> >>>>>>> Of course, it is very good to see a unify implementation to solve >>>> batch >>>>>>> sink and hive things, unify DataStream batch sink and DataStream >>>>>> streaming >>>>>>> sink and Table batch sink and Table streaming sink. >>>>>>> >>>>>>> Le's see what others think. >>>>>>> >>>>>>> Best, >>>>>>> Jingsong Lee >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski < >> pi...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jingsong, >>>>>>>> >>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has >> handled >>>> the >>>>>>>> partition and metastore logic well. >>>>>>>>> - unify batch and streaming >>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. >>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. >>>>>>>>> - It's natural to support more table features, like partition >>>>> commit, >>>>>>>> auto compact and etc.. >>>>>>>>> >>>>>>>>> Second way is reusing Datastream StreamingFileSink: >>>>>>>>> - unify streaming sink between table and Datastream. >>>>>>>>> - It maybe hard to introduce table related features to >>>>>>> StreamingFileSink. >>>>>>>>> >>>>>>>>> I prefer the first way a little. What do you think? >>>>>>>> >>>>>>>> I would be surprised if adding “exactly-once related logic” is >> just >>>>> 200 >>>>>>>> lines of code. There are things like multi part file upload to >> s3 >>>> and >>>>>>> there >>>>>>>> are also some pending features like [1]. I would suggest to >>>>> ask/involve >>>>>>>> Klou in this discussion. >>>>>>>> >>>>>>>> If it’s as easy to support exactly-once streaming with current >>>> batch >>>>>>> sink, >>>>>>>> that begs the question, why do we need to maintain >>>> StreamingFileSink? >>>>>>>> >>>>>>>> The worst possible outcome from my perspective will be, if we >> have >>>>>>> another >>>>>>>> example of an operator/logic implemented independently both in >>>>>> DataStream >>>>>>>> API and Table API. Because I’m pretty sure they will not be >> fully >>>>>>>> compatible, each with it’s own set of limitations, quirks and >>>>> features. >>>>>>>> Especially that we have on our long term roadmap and wish list >> to >>>>> unify >>>>>>>> such kind of operators. >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 < >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11499> >>>>>>>> >>>>>>>>> On 16 Mar 2020, at 06:55, Jingsong Li < >> jingsongl...@gmail.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>> Thanks Jinhai for involving. >>>>>>>>> >>>>>>>>>> we need add 'connector.sink.username' for >> UserGroupInformation >>>>> when >>>>>>> data >>>>>>>>> is written to HDFS >>>>>>>>> >>>>>>>>> Yes, I am not an expert of HDFS, but it seems we need do this >>>>> "doAs" >>>>>> in >>>>>>>> the >>>>>>>>> code for access external HDFS. I will update document. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jingsong Lee >>>>>>>>> >>>>>>>>> On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < >>>>> jingsongl...@gmail.com >>>>>>> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Piotr and Yun for involving. >>>>>>>>>> >>>>>>>>>> Hi Piotr and Yun, for implementation, >>>>>>>>>> >>>>>>>>>> FLINK-14254 [1] introduce batch sink table world, it deals >> with >>>>>>>> partitions >>>>>>>>>> thing, metastore thing and etc.. And it just reuse >>>>>> Dataset/Datastream >>>>>>>>>> FileInputFormat and FileOutputFormat. Filesystem can not do >>>>> without >>>>>>>>>> FileInputFormat, because it need deal with file things, >> split >>>>>> things. >>>>>>>> Like >>>>>>>>>> orc and parquet, they need read whole file and have >> different >>>>> split >>>>>>>> logic. >>>>>>>>>> >>>>>>>>>> So back to file system connector: >>>>>>>>>> - It needs introducing FilesystemTableFactory, >>>>> FilesystemTableSource >>>>>>> and >>>>>>>>>> FilesystemTableSink. >>>>>>>>>> - For sources, reusing Dataset/Datastream FileInputFormats, >>>> there >>>>>> are >>>>>>> no >>>>>>>>>> other interface to finish file reading. >>>>>>>>>> >>>>>>>>>> For file sinks: >>>>>>>>>> - Batch sink use FLINK-14254 >>>>>>>>>> - Streaming sink has two ways. >>>>>>>>>> >>>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has >> handled >>>> the >>>>>>>>>> partition and metastore logic well. >>>>>>>>>> - unify batch and streaming >>>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. >>>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. >>>>>>>>>> - It's natural to support more table features, like >> partition >>>>>> commit, >>>>>>>> auto >>>>>>>>>> compact and etc.. >>>>>>>>>> >>>>>>>>>> Second way is reusing Datastream StreamingFileSink: >>>>>>>>>> - unify streaming sink between table and Datastream. >>>>>>>>>> - It maybe hard to introduce table related features to >>>>>>>> StreamingFileSink. >>>>>>>>>> >>>>>>>>>> I prefer the first way a little. What do you think? >>>>>>>>>> >>>>>>>>>> Hi Yun, >>>>>>>>>> >>>>>>>>>>> Watermark mechanism might not be enough. >>>>>>>>>> >>>>>>>>>> Watermarks of subtasks are the same in the "snapshotState". >>>>>>>>>> >>>>>>>>>>> we might need to also do some coordination between >> subtasks. >>>>>>>>>> >>>>>>>>>> Yes, JobMaster is the role to control subtasks. Metastore >> is a >>>>> very >>>>>>>>>> fragile single point, which can not be accessed by >> distributed, >>>> so >>>>>> it >>>>>>> is >>>>>>>>>> uniformly accessed by JobMaster. >>>>>>>>>> >>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-14254 >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jingsong Lee >>>>>>>>>> >>>>>>>>>> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao < >> yungao...@aliyun.com> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Very thanks for Jinsong to bring up this discussion! >> It >>>>>> should >>>>>>>>>>> largely improve the usability after enhancing the >> FileSystem >>>>>>> connector >>>>>>>> in >>>>>>>>>>> Table. >>>>>>>>>>> >>>>>>>>>>> I have the same question with Piotr. From my side, I >>>> think >>>>> it >>>>>>>>>>> should be better to be able to reuse existing >>>> StreamingFileSink. >>>>> I >>>>>>>> think We >>>>>>>>>>> have began >>>>>>>>>>> enhancing the supported FileFormat (e.g., ORC, >> Avro...), >>>>> and >>>>>>>>>>> reusing StreamFileSink should be able to avoid repeat work >> in >>>> the >>>>>>> Table >>>>>>>>>>> library. Besides, >>>>>>>>>>> the bucket concept seems also matches the semantics >> of >>>>>>> partition. >>>>>>>>>>> >>>>>>>>>>> For the notification of adding partitions, I'm a >> little >>>>>>> wondering >>>>>>>>>>> that the Watermark mechanism might not be enough since >>>>>>> Bucket/Partition >>>>>>>>>>> might spans >>>>>>>>>>> multiple subtasks. It depends on the level of >>>> notification: >>>>>> if >>>>>>> we >>>>>>>>>>> want to notify for the bucket on each subtask, using >> watermark >>>> to >>>>>>>> notifying >>>>>>>>>>> each subtask >>>>>>>>>>> should be ok, but if we want to notifying for the >> whole >>>>>>>>>>> Bucket/Partition, we might need to also do some >> coordination >>>>>> between >>>>>>>>>>> subtasks. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Yun >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>> ------------------------------------------------------------------ >>>>>>>>>>> From:Piotr Nowojski <pi...@ververica.com> >>>>>>>>>>> Send Time:2020 Mar. 13 (Fri.) 18:03 >>>>>>>>>>> To:dev <dev@flink.apache.org> >>>>>>>>>>> Cc:user <u...@flink.apache.org>; user-zh < >>>>> user...@flink.apache.org >>>>>>> >>>>>>>>>>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in >> Table >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Which actual sinks/sources are you planning to use in this >>>>> feature? >>>>>>> Is >>>>>>>> it about exposing StreamingFileSink in the Table API? Or do you >>>> want >>>>> to >>>>>>>> implement new Sinks/Sources? >>>>>>>>>>> >>>>>>>>>>> Piotrek >>>>>>>>>>> >>>>>>>>>>>> On 13 Mar 2020, at 10:04, jinhai wang < >> jinhai...@gmail.com> >>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Thanks for FLIP-115. It is really useful feature for >> platform >>>>>>>> developers who manage hundreds of Flink to Hive jobs in >> production. >>>>>>>>>>> >>>>>>>>>>>> I think we need add 'connector.sink.username' for >>>>>>>> UserGroupInformation when data is written to HDFS >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 在 2020/3/13 下午3:33,“Jingsong Li”<jingsongl...@gmail.com> >> 写入: >>>>>>>>>>>> >>>>>>>>>>>> Hi everyone, >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> I'd like to start a discussion about FLIP-115 Filesystem >>>>>> connector >>>>>>>> in Table >>>>>>>>>>>> [1]. >>>>>>>>>>>> This FLIP will bring: >>>>>>>>>>>> - Introduce Filesystem table factory in table, support >>>>>>>>>>>> csv/parquet/orc/json/avro formats. >>>>>>>>>>>> - Introduce streaming filesystem/hive sink in table >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> CC to user mail list, if you have any unmet needs, >> please >>>> feel >>>>>>> free >>>>>>>> to >>>>>>>>>>>> reply~ >>>>>>>>>>>> >>>>>>>>>>>> Look forward to hearing from you. >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Jingsong Lee >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best, Jingsong Lee >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best, Jingsong Lee >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>