Hi all, After some great discussion, I think we have at least reached a consensus that we can have a unified sink to handle streaming, batch, hive and HDFS. And for FileSystem connector, undoubtedly, we reuse DataStream StreamingFileSink.
I updated the FLIP: 1.Move external HDFS and close exactly-once to future plan. 2. Provide Implementation details: - Introduce FileSystemTableFactory, FileSystemTableSource, FileSystemTableSink to provide table related implementations, this should not be blocked by FLIP-95[1], but will migrate to new interfaces after FLIP-95 finished. - Read: Reusing FileInputFormat, this should not be blocked by FLIP-27[2], but will migrate to new interfaces after FLIP-27 finished. - Write: Reusing current batch sink and DataStream StreamingFileSink - Formats should do: - Data format: Format should use BaseRow, after FLIP-95, we use BaseRow to be source/sink data format. - Read: Format should provide FileInputFormat with partition fields support. - Write: Format should provide BulkWriter.Factory or Encoder for unify sink implementation (Now for StreamingFileSink). - We plan to implement CSV, JSON, PARQUET, ORC first. This FLIP describes the basic user interface, and we have reached a consensus on implementation. Consider there is a lot of work to be done, I'd like start a voting thread for this. What do you think? [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface Best, Jingsong Lee On Thu, Mar 19, 2020 at 3:48 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi, > > > I am very interested in the topic. I would like to join the offline > discussion if possible. I think you guys already give many inputs and > concerns. I would share some of my thoughts. Correct me if I misunderstand > you. > > Flink is a unified engine. Since that Flink should provide the e2e > exactly-once semantics for the user in both streaming and batch. E2E > exactly-once is not a trivial thing. > > StreamingFileSink already does a lot of work on how to support e2e > exactly-once semantics for the “file” output scenario. It provides a > layered architecture > > 1. > > BulkWriter/Encode deals with the data format in a file. > 2. > > BucketAssinger/RollingPolicy deals with the lifecycle of the file and > directory structure. > 3. > > RecoverableWriter deals with the exactly-once semantics. > > > All these layers are orthogonal and could be combined with each other. This > could reduce much of the work. (Currently, there are some limitations.) > There are some already known issues such as how to support batch in the > StreamingFileSink, How to support orc and so on. But there are already some > discussions on how to resolve these issues. > > Jinsong also gives some new cases that the StreamingFileSink might not > support currently. I am very glad to see that you all agree that improving > the StreamingFileSink architecture for these new cases. > > Best, > Guowei > > > Jingsong Li <jingsongl...@gmail.com> 于2020年3月19日周四 上午12:19写道: > > > Hi Stephan & Kostas & Piotrek, thanks for these inputs, > > > > Maybe what I expressed is not clear. For the implementation, I want to > know > > what you think, rather than must making another set from scratch. Piotrek > > you are right, implementation is the part of this FLIP too, because we > can > > not list all detail things in the FLIP, so the implementation do affect > > user's behaviors. And the maintenance / development costs are also > points. > > > > I think you already persuaded me. I am thinking about based on > > StreamingFileSink. And extending StreamingFileSink can solve "partition > > commit" requirement, I have tried in my POC. And it is true, Recoverable > > things and S3 things also important. > > So I listed "What is missing" for StreamingFileSink in previous mail. (It > > is not defense for making another set from scratch) > > > > Hi Stephan, > > > > >> The FLIP is "Filesystem connector in Table", it's about building up > > Flink > > Table's capabilities. > > > > What I mean is this is not just for Hive, this FLIP is for table. So we > > don't need do all things for Hive. But Hive is also a "format" (or > > something else) of Filesystem connector. Its requirements can be > > considered. > > > > I think you are right about the design, and let me take this seriously, a > > unify way make us stronger, less confuse, less surprise, more rigorous > > design. And I am pretty sure table things are good for enhancing > DataStream > > api too. > > > > Hi Kostas, > > > > Yes, Parquet and Orc are the main formats. Good to know your support~ > > > > I think streaming file sink and file system connector things are > important > > to Flink, it is good&time to think about these common requirements, think > > about batch support, it is not just about table, it is for whole Flink > too. > > If there are some requirements that is hard to support or violates > existing > > design. Exclude them. > > > > Best, > > Jingsong Lee > > > > > > On Wed, Mar 18, 2020 at 10:31 PM Piotr Nowojski <pi...@ververica.com> > > wrote: > > > > > Hi Kurt, > > > > > > +1 for having some offline discussion on this topic. > > > > > > But I think the question about using StreamingFileSink or implementing > > > subset of it’s feature from scratch is quite fundamental design > decision, > > > with impact on the behaviour of Public API, so I wouldn’t discard it as > > > technical detail and should be included as part of the FLIP (I know It > > > could be argued in opposite direction). > > > > > > Piotrek > > > > > > > On 18 Mar 2020, at 13:55, Kurt Young <ykt...@gmail.com> wrote: > > > > > > > > Hi all, > > > > > > > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply > > the > > > > implementation > > > > of such connector yet, it only describes the functionality and > expected > > > > behaviors from user's > > > > perspective. Reusing current StreamingFileSink is definitely one of > the > > > > possible ways to > > > > implement it. Since there are lots of details and I would suggest we > > can > > > > have an offline meeting > > > > to discuss the how these could be achieved by extending > > StremingFileSink, > > > > and how much > > > > effort we need to put on it. What do you think? > > > > > > > > Best, > > > > Kurt > > > > > > > > > > > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <kklou...@gmail.com> > > > wrote: > > > > > > > >> Hi all, > > > >> > > > >> I also agree with Stephan on this! > > > >> > > > >> It has been more than a year now that most of our efforts have had > the > > > >> "unify" / "unification"/ etc either on their title or in their core > > > >> and this has been the focus of all our resources. By deviating from > > > >> this now, we only put more stress on other teams in the future. When > > > >> the users start using a given API, with high probability, they will > > > >> ask (and it is totally reasonable) consistent behaviour from all the > > > >> other APIs that ship with Flink. This will eventually lead to having > > > >> to answer the questions that we now deem as difficult in a future > > > >> release, when we will have to "unify" again. > > > >> > > > >> In addition, Hive integration is definitely a "nice to have" feature > > > >> but it does not mean that we need to push for 100% compatibility if > it > > > >> is not required. > > > >> @Jingsong Li if you think that Parquet and Orc are the main formats, > > > >> we can focus on these and provide good support for them (both > reading > > > >> and writing). For maintainability, I think that given the amount of > > > >> demand for these formats, it is not going to be a huge problem at > > > >> least for now. > > > >> > > > >> Given the above, I am also leaning towards a solution that aims at > > > >> extending the StreamingFileSink to efficiently support bulk formats > > > >> like Parquet and Orc, rather than creating a new sink that locks > > > >> Hive-dependent usecases to a specific API. > > > >> > > > >> Cheers, > > > >> Kostas > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <se...@apache.org> > > wrote: > > > >>> > > > >>>>> The FLIP is "Filesystem connector in Table", it's about building > up > > > >>> Flink Table's capabilities. > > > >>> > > > >>> That is exactly what worries me. The whole effort is not thinking > > about > > > >>> Flink as a whole any more. > > > >>> This proposal is not trying to build a consistent user experience > > > across > > > >>> batch and streaming, across Table API, SQL, and DataStream. > > > >>> > > > >>> The proposal is building a separate, disconnected ecosystem for the > > > Table > > > >>> API, specific to batch processing and some limited streaming > setups. > > > >>> Specific to one type of environment (Hive and HDFS). It willingly > > omits > > > >>> support for other environments and conflicts with efforts in other > > > >>> components to unify. > > > >>> > > > >>> Supporting common use cases is good, but in my opinion not at the > > price > > > >> of > > > >>> creating a "fractured" project where the approaches in different > > layers > > > >>> don't fit together any more. > > > >>> > > > >>> > > > >>>> *## StreamingFileSink not support writer with path* > > > >>>> > > > >>>> The FLIP is "Filesystem connector in Table", it's about building > up > > > >> Flink > > > >>>> Table's capabilities. But I think Hive is important, I see that > most > > > >> users > > > >>>> use Flink and Spark to write data from Kafka to hive. Streaming > > > >> writing, I > > > >>>> see that these two engines are convenient and popular. I mean, > Flink > > > >> is not > > > >>>> only a hive runtime, but also an important part of offline data > > > >> warehouse. > > > >>>> The thing is StreamingFileSink not support hadoop record writers. > > Yes, > > > >> we > > > >>>> can support them one by one. I see the community integrating ORC > > [1]. > > > >> But > > > >>>> it's really not an easy thing. And we have to be careful to > maintain > > > >>>> compatibility. After all, users downstream use other computing > > engines > > > >> to > > > >>>> analyze. > > > >>>> Yes, exposing "RecoverableFsDataOutputStream" to writers is good > to > > > >>>> subsequent optimization [2]. But there are many cases. It is > enough > > > for > > > >>>> users to generate new files at the checkpoint. They pay more > > attention > > > >> to > > > >>>> whether they can do it and whether there is a risk of > compatibility. > > > >>>> Therefore, RecordWriter is used here. > > > >>>> > > > >>>> *## External HDFS access* > > > >>>> > > > >>>> Including hadoop configuration and Kerberos related things. > > > >>>> > > > >>>> *## Partition commit* > > > >>>> > > > >>>> Committing a partition is to notify the downstream application > that > > > the > > > >>>> partition has finished writing, the partition is ready to be > > read.The > > > >>>> common way is to add a success file or update metastore. Of > course, > > > >> there > > > >>>> are other ways to notify. We need to provide flexible mechanisms. > > > >>>> As you mentioned, yes, we can extend "StreamingFileSink" for this > > > part. > > > >>>> > > > >>>> *## Batch / Streaming Unification* > > > >>>> > > > >>>> Yes, it is about exactly-once and single commit at the end, There > > are > > > >> also > > > >>>> some "bounded" differences. For example, batch can support > sorting. > > In > > > >> this > > > >>>> way, you can sort by partition, which can reduce the number of > > writers > > > >>>> written at the same time. Dynamic partition writing in batch may > > > >> produce > > > >>>> many unordered partitions. > > > >>>> > > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-10114 > > > >>>> [2] https://issues.apache.org/jira/browse/FLINK-11499 > > > >>>> > > > >>>> Best, > > > >>>> Jingsong Lee > > > >>>> > > > >>>> On Tue, Mar 17, 2020 at 8:00 PM LakeShen < > shenleifight...@gmail.com > > > > > > >>>> wrote: > > > >>>> > > > >>>>> Hi Jingsong , > > > >>>>> > > > >>>>> I am looking forward this feature. Because in some streaming > > > >>>> application,it > > > >>>>> need transfer their messages to hdfs , in order to offline > > analysis. > > > >>>>> > > > >>>>> Best wishes, > > > >>>>> LakeShen > > > >>>>> > > > >>>>> Stephan Ewen <se...@apache.org> 于2020年3月17日周二 下午7:42写道: > > > >>>>> > > > >>>>>> I would really like to see us converging the stack and the > > > >>>> functionality > > > >>>>>> here. > > > >>>>>> Meaning to try and use the same sinks in the Table API as for > the > > > >>>>>> DataStream API, and using the same sink for batch and streaming. > > > >>>>>> > > > >>>>>> The StreamingFileSink has a lot of things that can help with > that. > > > >> If > > > >>>>>> possible, it would be nice to extend it (which would help move > > > >> towards > > > >>>>> the > > > >>>>>> above goal) rather than build a second sink. Building a second > > sink > > > >>>> leads > > > >>>>>> us further away from unification. > > > >>>>>> > > > >>>>>> I am a bit puzzled by the statement that sinks are primarily for > > > >> Hive. > > > >>>>> The > > > >>>>>> Table API should not be coupled to Hive, it should be an > > > >> independent > > > >>>>>> batch/streaming API for many use cases, supporting very well for > > > >> batch > > > >>>>> and > > > >>>>>> streaming interplay. Supporting Hive is great, but we should not > > be > > > >>>>>> building this towards Hive, as just yet another Hive runtime. > Why > > > >> "yet > > > >>>>>> another Hive runtime" when what we have a unique streaming > engine > > > >> that > > > >>>>> can > > > >>>>>> do much more? We would drop our own strength and reduce > ourselves > > > >> to a > > > >>>>>> limited subset. > > > >>>>>> > > > >>>>>> Let's build a File Sink that can also support Hive, but can do > so > > > >> much > > > >>>>>> more. For example, efficient streaming file ingestion as > > > >> materialized > > > >>>>> views > > > >>>>>> from changelogs. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## Writing Files in Streaming* > > > >>>>>> > > > >>>>>> To write files in streaming, I don't see another way than using > > the > > > >>>>>> streaming file sink. If you want to write files across > > checkpoints, > > > >>>>> support > > > >>>>>> exactly-once, and support consistent "stop with savepoint", it > is > > > >> not > > > >>>>>> trivial. > > > >>>>>> > > > >>>>>> A part of the complexity comes from the fact that not all > targets > > > >> are > > > >>>>>> actually file systems, and not all have simple semantics for > > > >>>> persistence. > > > >>>>>> S3 for example does not support renames (only copies, which may > > > >> take a > > > >>>>> lot > > > >>>>>> of time) and it does not support flush/sync of data (the S3 file > > > >> system > > > >>>>> in > > > >>>>>> Hadoop exposes that but it does not work. flush/sync, followed > by > > a > > > >>>>>> failure, leads to data loss). You need to devise a separate > > > >> protocol > > > >>>> for > > > >>>>>> that, which is exactly what has already been done and abstracted > > > >> behind > > > >>>>> the > > > >>>>>> recoverable writers. > > > >>>>>> > > > >>>>>> If you re-engineer that in the, you will end up either missing > > many > > > >>>>> things > > > >>>>>> (intermediate persistence on different file systems, and atomic > > > >> commit > > > >>>> in > > > >>>>>> the absence of renames, etc.), or you end up doing something > > > >> similar as > > > >>>>> the > > > >>>>>> recoverable writers do. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## Atomic Commit in Batch* > > > >>>>>> > > > >>>>>> For batch sinks, it is also desirable to write the data first > and > > > >> then > > > >>>>>> atomically commit it once the job is done. > > > >>>>>> Hadoop has spent a lot of time making this work, see this doc > > here, > > > >>>>>> specifically the section on 'The "Magic" Committer'. [1] > > > >>>>>> > > > >>>>>> What Flink has built in the RecoverableWriter is in some way an > > > >> even > > > >>>>> better > > > >>>>>> version of this, because it works without extra files (we pass > > data > > > >>>>> through > > > >>>>>> checkpoint state) and it supports not only committing once at > the > > > >> end, > > > >>>>> but > > > >>>>>> committing multiple time intermediate parts during checkpoints. > > > >>>>>> > > > >>>>>> Meaning using the recoverable writer mechanism in batch would > > > >> allow us > > > >>>> to > > > >>>>>> immediately get the efficient atomic commit implementations on > > > >> file:// > > > >>>>>> hdfs:// and s3://, with a well defined way to implement it also > > for > > > >>>> other > > > >>>>>> file systems. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## Batch / Streaming Unification* > > > >>>>>> > > > >>>>>> It would be great to start looking at these things in the same > > way: > > > >>>>>> - streaming (exactly-once): commits files (after finished) at > the > > > >>>> next > > > >>>>>> checkpoint > > > >>>>>> - batch: single commit at the end of the job > > > >>>>>> > > > >>>>>> > > > >>>>>> *## DataStream / Table API Stack Unification* > > > >>>>>> > > > >>>>>> Having the same set of capabilities would make it much easier > for > > > >> users > > > >>>>> to > > > >>>>>> understand the system. > > > >>>>>> Especially when it comes to consistent behavior across external > > > >>>> systems. > > > >>>>>> Having a different file sink in Table API and DataStream API > means > > > >> that > > > >>>>>> DataStream can write correctly to S3 while Table API cannot. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## What is missing?* > > > >>>>>> > > > >>>>>> It seems there are some things that get in the way of naturally > > > >>>>>> Can you make a list of what features are missing in the > > > >>>> StreamingFileSink > > > >>>>>> that make it usable for the use cases you have in mind? > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Stephan > > > >>>>>> > > > >>>>>> [1] > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > >>>>>> > > > >>>>>> > > > >>>>>> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li < > > > >> jingsongl...@gmail.com> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Hi Piotr, > > > >>>>>>> > > > >>>>>>> I am very entangled. > > > >>>>>>> > > > >>>>>>> Let me re-list the table streaming sink requirements: > > > >>>>>>> - In table, maybe 90% sinks are for Hive. The parquet and orc > > > >> are the > > > >>>>>> most > > > >>>>>>> important formats. Hive provide RecordWriters, it is easy to > > > >> support > > > >>>>> all > > > >>>>>>> hive formats by using it, and we don't need concern hive > version > > > >>>>>>> compatibility too, but it can not work with FSDataOutputStream. > > > >>>>>>> - Hive table maybe use external HDFS. It means, hive has its > own > > > >>>> hadoop > > > >>>>>>> configuration. > > > >>>>>>> - In table, partition commit is needed, we can not just move > > > >> files, > > > >>>> it > > > >>>>> is > > > >>>>>>> important to complete table semantics to update catalog. > > > >>>>>>> > > > >>>>>>> You are right DataStream and Table streaming sink will not be > > > >> fully > > > >>>>>>> compatible, each with its own set of limitations, quirks and > > > >>>> features. > > > >>>>>>> But if re-using DataStream, batch and streaming also will not > be > > > >>>> fully > > > >>>>>>> compatible. Provide a unify experience to batch and streaming > is > > > >> also > > > >>>>>>> important. > > > >>>>>>> > > > >>>>>>> Table and DataStream have different concerns, and they tilt in > > > >>>>> different > > > >>>>>>> directions. > > > >>>>>>> > > > >>>>>>> Of course, it is very good to see a unify implementation to > solve > > > >>>> batch > > > >>>>>>> sink and hive things, unify DataStream batch sink and > DataStream > > > >>>>>> streaming > > > >>>>>>> sink and Table batch sink and Table streaming sink. > > > >>>>>>> > > > >>>>>>> Le's see what others think. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Jingsong Lee > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski < > > > >> pi...@ververica.com> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Hi Jingsong, > > > >>>>>>>> > > > >>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > > > >> handled > > > >>>> the > > > >>>>>>>> partition and metastore logic well. > > > >>>>>>>>> - unify batch and streaming > > > >>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > > > >>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > > > >>>>>>>>> - It's natural to support more table features, like partition > > > >>>>> commit, > > > >>>>>>>> auto compact and etc.. > > > >>>>>>>>> > > > >>>>>>>>> Second way is reusing Datastream StreamingFileSink: > > > >>>>>>>>> - unify streaming sink between table and Datastream. > > > >>>>>>>>> - It maybe hard to introduce table related features to > > > >>>>>>> StreamingFileSink. > > > >>>>>>>>> > > > >>>>>>>>> I prefer the first way a little. What do you think? > > > >>>>>>>> > > > >>>>>>>> I would be surprised if adding “exactly-once related logic” is > > > >> just > > > >>>>> 200 > > > >>>>>>>> lines of code. There are things like multi part file upload to > > > >> s3 > > > >>>> and > > > >>>>>>> there > > > >>>>>>>> are also some pending features like [1]. I would suggest to > > > >>>>> ask/involve > > > >>>>>>>> Klou in this discussion. > > > >>>>>>>> > > > >>>>>>>> If it’s as easy to support exactly-once streaming with current > > > >>>> batch > > > >>>>>>> sink, > > > >>>>>>>> that begs the question, why do we need to maintain > > > >>>> StreamingFileSink? > > > >>>>>>>> > > > >>>>>>>> The worst possible outcome from my perspective will be, if we > > > >> have > > > >>>>>>> another > > > >>>>>>>> example of an operator/logic implemented independently both in > > > >>>>>> DataStream > > > >>>>>>>> API and Table API. Because I’m pretty sure they will not be > > > >> fully > > > >>>>>>>> compatible, each with it’s own set of limitations, quirks and > > > >>>>> features. > > > >>>>>>>> Especially that we have on our long term roadmap and wish list > > > >> to > > > >>>>> unify > > > >>>>>>>> such kind of operators. > > > >>>>>>>> > > > >>>>>>>> Piotrek > > > >>>>>>>> > > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11499> > > > >>>>>>>> > > > >>>>>>>>> On 16 Mar 2020, at 06:55, Jingsong Li < > > > >> jingsongl...@gmail.com> > > > >>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>> Thanks Jinhai for involving. > > > >>>>>>>>> > > > >>>>>>>>>> we need add 'connector.sink.username' for > > > >> UserGroupInformation > > > >>>>> when > > > >>>>>>> data > > > >>>>>>>>> is written to HDFS > > > >>>>>>>>> > > > >>>>>>>>> Yes, I am not an expert of HDFS, but it seems we need do this > > > >>>>> "doAs" > > > >>>>>> in > > > >>>>>>>> the > > > >>>>>>>>> code for access external HDFS. I will update document. > > > >>>>>>>>> > > > >>>>>>>>> Best, > > > >>>>>>>>> Jingsong Lee > > > >>>>>>>>> > > > >>>>>>>>> On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > > > >>>>> jingsongl...@gmail.com > > > >>>>>>> > > > >>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Thanks Piotr and Yun for involving. > > > >>>>>>>>>> > > > >>>>>>>>>> Hi Piotr and Yun, for implementation, > > > >>>>>>>>>> > > > >>>>>>>>>> FLINK-14254 [1] introduce batch sink table world, it deals > > > >> with > > > >>>>>>>> partitions > > > >>>>>>>>>> thing, metastore thing and etc.. And it just reuse > > > >>>>>> Dataset/Datastream > > > >>>>>>>>>> FileInputFormat and FileOutputFormat. Filesystem can not do > > > >>>>> without > > > >>>>>>>>>> FileInputFormat, because it need deal with file things, > > > >> split > > > >>>>>> things. > > > >>>>>>>> Like > > > >>>>>>>>>> orc and parquet, they need read whole file and have > > > >> different > > > >>>>> split > > > >>>>>>>> logic. > > > >>>>>>>>>> > > > >>>>>>>>>> So back to file system connector: > > > >>>>>>>>>> - It needs introducing FilesystemTableFactory, > > > >>>>> FilesystemTableSource > > > >>>>>>> and > > > >>>>>>>>>> FilesystemTableSink. > > > >>>>>>>>>> - For sources, reusing Dataset/Datastream FileInputFormats, > > > >>>> there > > > >>>>>> are > > > >>>>>>> no > > > >>>>>>>>>> other interface to finish file reading. > > > >>>>>>>>>> > > > >>>>>>>>>> For file sinks: > > > >>>>>>>>>> - Batch sink use FLINK-14254 > > > >>>>>>>>>> - Streaming sink has two ways. > > > >>>>>>>>>> > > > >>>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > > > >> handled > > > >>>> the > > > >>>>>>>>>> partition and metastore logic well. > > > >>>>>>>>>> - unify batch and streaming > > > >>>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > > > >>>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > > > >>>>>>>>>> - It's natural to support more table features, like > > > >> partition > > > >>>>>> commit, > > > >>>>>>>> auto > > > >>>>>>>>>> compact and etc.. > > > >>>>>>>>>> > > > >>>>>>>>>> Second way is reusing Datastream StreamingFileSink: > > > >>>>>>>>>> - unify streaming sink between table and Datastream. > > > >>>>>>>>>> - It maybe hard to introduce table related features to > > > >>>>>>>> StreamingFileSink. > > > >>>>>>>>>> > > > >>>>>>>>>> I prefer the first way a little. What do you think? > > > >>>>>>>>>> > > > >>>>>>>>>> Hi Yun, > > > >>>>>>>>>> > > > >>>>>>>>>>> Watermark mechanism might not be enough. > > > >>>>>>>>>> > > > >>>>>>>>>> Watermarks of subtasks are the same in the "snapshotState". > > > >>>>>>>>>> > > > >>>>>>>>>>> we might need to also do some coordination between > > > >> subtasks. > > > >>>>>>>>>> > > > >>>>>>>>>> Yes, JobMaster is the role to control subtasks. Metastore > > > >> is a > > > >>>>> very > > > >>>>>>>>>> fragile single point, which can not be accessed by > > > >> distributed, > > > >>>> so > > > >>>>>> it > > > >>>>>>> is > > > >>>>>>>>>> uniformly accessed by JobMaster. > > > >>>>>>>>>> > > > >>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Jingsong Lee > > > >>>>>>>>>> > > > >>>>>>>>>> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao < > > > >> yungao...@aliyun.com> > > > >>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Very thanks for Jinsong to bring up this discussion! > > > >> It > > > >>>>>> should > > > >>>>>>>>>>> largely improve the usability after enhancing the > > > >> FileSystem > > > >>>>>>> connector > > > >>>>>>>> in > > > >>>>>>>>>>> Table. > > > >>>>>>>>>>> > > > >>>>>>>>>>> I have the same question with Piotr. From my side, I > > > >>>> think > > > >>>>> it > > > >>>>>>>>>>> should be better to be able to reuse existing > > > >>>> StreamingFileSink. > > > >>>>> I > > > >>>>>>>> think We > > > >>>>>>>>>>> have began > > > >>>>>>>>>>> enhancing the supported FileFormat (e.g., ORC, > > > >> Avro...), > > > >>>>> and > > > >>>>>>>>>>> reusing StreamFileSink should be able to avoid repeat work > > > >> in > > > >>>> the > > > >>>>>>> Table > > > >>>>>>>>>>> library. Besides, > > > >>>>>>>>>>> the bucket concept seems also matches the semantics > > > >> of > > > >>>>>>> partition. > > > >>>>>>>>>>> > > > >>>>>>>>>>> For the notification of adding partitions, I'm a > > > >> little > > > >>>>>>> wondering > > > >>>>>>>>>>> that the Watermark mechanism might not be enough since > > > >>>>>>> Bucket/Partition > > > >>>>>>>>>>> might spans > > > >>>>>>>>>>> multiple subtasks. It depends on the level of > > > >>>> notification: > > > >>>>>> if > > > >>>>>>> we > > > >>>>>>>>>>> want to notify for the bucket on each subtask, using > > > >> watermark > > > >>>> to > > > >>>>>>>> notifying > > > >>>>>>>>>>> each subtask > > > >>>>>>>>>>> should be ok, but if we want to notifying for the > > > >> whole > > > >>>>>>>>>>> Bucket/Partition, we might need to also do some > > > >> coordination > > > >>>>>> between > > > >>>>>>>>>>> subtasks. > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Best, > > > >>>>>>>>>>> Yun > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>> > ------------------------------------------------------------------ > > > >>>>>>>>>>> From:Piotr Nowojski <pi...@ververica.com> > > > >>>>>>>>>>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > >>>>>>>>>>> To:dev <dev@flink.apache.org> > > > >>>>>>>>>>> Cc:user <u...@flink.apache.org>; user-zh < > > > >>>>> user...@flink.apache.org > > > >>>>>>> > > > >>>>>>>>>>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in > > > >> Table > > > >>>>>>>>>>> > > > >>>>>>>>>>> Hi, > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Which actual sinks/sources are you planning to use in this > > > >>>>> feature? > > > >>>>>>> Is > > > >>>>>>>> it about exposing StreamingFileSink in the Table API? Or do > you > > > >>>> want > > > >>>>> to > > > >>>>>>>> implement new Sinks/Sources? > > > >>>>>>>>>>> > > > >>>>>>>>>>> Piotrek > > > >>>>>>>>>>> > > > >>>>>>>>>>>> On 13 Mar 2020, at 10:04, jinhai wang < > > > >> jinhai...@gmail.com> > > > >>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Thanks for FLIP-115. It is really useful feature for > > > >> platform > > > >>>>>>>> developers who manage hundreds of Flink to Hive jobs in > > > >> production. > > > >>>>>>>>>>> > > > >>>>>>>>>>>> I think we need add 'connector.sink.username' for > > > >>>>>>>> UserGroupInformation when data is written to HDFS > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 在 2020/3/13 下午3:33,“Jingsong Li”<jingsongl...@gmail.com> > > > >> 写入: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>> I'd like to start a discussion about FLIP-115 Filesystem > > > >>>>>> connector > > > >>>>>>>> in Table > > > >>>>>>>>>>>> [1]. > > > >>>>>>>>>>>> This FLIP will bring: > > > >>>>>>>>>>>> - Introduce Filesystem table factory in table, support > > > >>>>>>>>>>>> csv/parquet/orc/json/avro formats. > > > >>>>>>>>>>>> - Introduce streaming filesystem/hive sink in table > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>> CC to user mail list, if you have any unmet needs, > > > >> please > > > >>>> feel > > > >>>>>>> free > > > >>>>>>>> to > > > >>>>>>>>>>>> reply~ > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Look forward to hearing from you. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> [1] > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best, > > > >>>>>>>>>>>> Jingsong Lee > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> -- > > > >>>>>>>>>> Best, Jingsong Lee > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> Best, Jingsong Lee > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> -- > > > >>>>>>> Best, Jingsong Lee > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> Best, Jingsong Lee > > > >>>> > > > >> > > > > > > > > > > -- > > Best, Jingsong Lee > > > -- Best, Jingsong Lee