Hi all,

After some great discussion, I think we have at least reached a consensus
that we can have a unified sink to handle streaming, batch, hive and HDFS.
And for FileSystem connector, undoubtedly, we reuse DataStream
StreamingFileSink.

I updated the FLIP:

1.Move external HDFS and close exactly-once to future plan.

2. Provide Implementation details:
- Introduce FileSystemTableFactory, FileSystemTableSource,
FileSystemTableSink to provide table related implementations, this should
not be blocked by FLIP-95[1], but will migrate to new interfaces after
FLIP-95 finished.
- Read: Reusing FileInputFormat, this should not be blocked by FLIP-27[2],
but will migrate to new interfaces after FLIP-27 finished.
- Write: Reusing current batch sink and DataStream StreamingFileSink
- Formats should do:
    - Data format: Format should use BaseRow, after FLIP-95, we use BaseRow
to be source/sink data format.
    - Read: Format should provide FileInputFormat with partition fields
support.
    - Write: Format should provide BulkWriter.Factory or Encoder for unify
sink implementation (Now for StreamingFileSink).
    - We plan to implement CSV, JSON, PARQUET, ORC first.

This FLIP describes the basic user interface, and we have reached a
consensus on implementation. Consider there is a lot of work to be done,
I'd like start a voting thread for this. What do you think?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Best,
Jingsong Lee

On Thu, Mar 19, 2020 at 3:48 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi,
>
>
> I am very interested in the topic. I would like to join the offline
> discussion if possible. I think you guys already give many inputs and
> concerns. I would share some of my thoughts. Correct me if I misunderstand
> you.
>
> Flink is a unified engine. Since that Flink should provide the e2e
> exactly-once semantics for the user in both streaming and batch. E2E
> exactly-once is not a trivial thing.
>
> StreamingFileSink already does a lot of work on how to support e2e
> exactly-once semantics for the “file” output scenario. It provides a
> layered architecture
>
>    1.
>
>    BulkWriter/Encode deals with the data format in a file.
>    2.
>
>    BucketAssinger/RollingPolicy deals with the lifecycle of the file and
>    directory structure.
>    3.
>
>    RecoverableWriter deals with the exactly-once semantics.
>
>
> All these layers are orthogonal and could be combined with each other. This
> could reduce much of the work. (Currently, there are some limitations.)
> There are some already known issues such as how to support batch in the
> StreamingFileSink, How to support orc and so on. But there are already some
> discussions on how to resolve these issues.
>
> Jinsong also gives some new cases that the StreamingFileSink might not
> support currently.  I am very glad to see that you all agree that improving
> the StreamingFileSink architecture for these new cases.
>
> Best,
> Guowei
>
>
> Jingsong Li <jingsongl...@gmail.com> 于2020年3月19日周四 上午12:19写道:
>
> > Hi Stephan & Kostas & Piotrek, thanks for these inputs,
> >
> > Maybe what I expressed is not clear. For the implementation, I want to
> know
> > what you think, rather than must making another set from scratch. Piotrek
> > you are right, implementation is the part of this FLIP too, because we
> can
> > not list all detail things in the FLIP, so the implementation do affect
> > user's behaviors. And the maintenance / development costs are also
> points.
> >
> > I think you already persuaded me. I am thinking about based on
> > StreamingFileSink. And extending StreamingFileSink can solve "partition
> > commit" requirement, I have tried in my POC. And it is true, Recoverable
> > things and S3 things also important.
> > So I listed "What is missing" for StreamingFileSink in previous mail. (It
> > is not defense for making another set from scratch)
> >
> > Hi Stephan,
> >
> > >> The FLIP is "Filesystem connector in Table", it's about building up
> > Flink
> > Table's capabilities.
> >
> > What I mean is this is not just for Hive, this FLIP is for table. So we
> > don't need do all things for Hive. But Hive is also a "format" (or
> > something else) of Filesystem connector. Its requirements can be
> > considered.
> >
> > I think you are right about the design, and let me take this seriously, a
> > unify way make us stronger, less confuse, less surprise, more rigorous
> > design. And I am pretty sure table things are good for enhancing
> DataStream
> > api too.
> >
> > Hi Kostas,
> >
> > Yes, Parquet and Orc are the main formats. Good to know your support~
> >
> > I think streaming file sink and file system connector things are
> important
> > to Flink, it is good&time to think about these common requirements, think
> > about batch support, it is not just about table, it is for whole Flink
> too.
> > If there are some requirements that is hard to support or violates
> existing
> > design. Exclude them.
> >
> > Best,
> > Jingsong Lee
> >
> >
> > On Wed, Mar 18, 2020 at 10:31 PM Piotr Nowojski <pi...@ververica.com>
> > wrote:
> >
> > > Hi Kurt,
> > >
> > > +1 for having some offline discussion on this topic.
> > >
> > > But I think the question about using StreamingFileSink or implementing
> > > subset of it’s feature from scratch is quite fundamental design
> decision,
> > > with impact on the behaviour of Public API, so I wouldn’t discard it as
> > > technical detail and should be included as part of the FLIP (I know It
> > > could be argued in opposite direction).
> > >
> > > Piotrek
> > >
> > > > On 18 Mar 2020, at 13:55, Kurt Young <ykt...@gmail.com> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply
> > the
> > > > implementation
> > > > of such connector yet, it only describes the functionality and
> expected
> > > > behaviors from user's
> > > > perspective. Reusing current StreamingFileSink is definitely one of
> the
> > > > possible ways to
> > > > implement it. Since there are lots of details and I would suggest we
> > can
> > > > have an offline meeting
> > > > to discuss the how these could be achieved by extending
> > StremingFileSink,
> > > > and how much
> > > > effort we need to put on it. What do you think?
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <kklou...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I also agree with Stephan on this!
> > > >>
> > > >> It has been more than a year now that most of our efforts have had
> the
> > > >> "unify" / "unification"/ etc either on their title or in their core
> > > >> and this has been the focus of all our resources. By deviating from
> > > >> this now, we only put more stress on other teams in the future. When
> > > >> the users start using a given API, with high probability, they will
> > > >> ask (and it is totally reasonable) consistent behaviour from all the
> > > >> other APIs that ship with Flink. This will eventually lead to having
> > > >> to answer the questions that we now deem as difficult in a future
> > > >> release, when we will have to "unify" again.
> > > >>
> > > >> In addition, Hive integration is definitely a "nice to have" feature
> > > >> but it does not mean that we need to push for 100% compatibility if
> it
> > > >> is not required.
> > > >> @Jingsong Li if you think that Parquet and Orc are the main formats,
> > > >> we can focus on these and provide good support for them (both
> reading
> > > >> and writing). For maintainability, I think that given the amount of
> > > >> demand for these formats, it is not going to be a huge problem at
> > > >> least for now.
> > > >>
> > > >> Given the above, I am also leaning towards a solution that aims at
> > > >> extending the StreamingFileSink to efficiently support bulk formats
> > > >> like Parquet and Orc, rather than creating a new sink that locks
> > > >> Hive-dependent usecases to a specific API.
> > > >>
> > > >> Cheers,
> > > >> Kostas
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <se...@apache.org>
> > wrote:
> > > >>>
> > > >>>>> The FLIP is "Filesystem connector in Table", it's about building
> up
> > > >>> Flink Table's capabilities.
> > > >>>
> > > >>> That is exactly what worries me. The whole effort is not thinking
> > about
> > > >>> Flink as a whole any more.
> > > >>> This proposal is not trying to build a consistent user experience
> > > across
> > > >>> batch and streaming, across Table API, SQL, and DataStream.
> > > >>>
> > > >>> The proposal is building a separate, disconnected ecosystem for the
> > > Table
> > > >>> API, specific to batch processing and some limited streaming
> setups.
> > > >>> Specific to one type of environment (Hive and HDFS). It willingly
> > omits
> > > >>> support for other environments and conflicts with efforts in other
> > > >>> components to unify.
> > > >>>
> > > >>> Supporting common use cases is good, but in my opinion not at the
> > price
> > > >> of
> > > >>> creating a "fractured" project where the approaches in different
> > layers
> > > >>> don't fit together any more.
> > > >>>
> > > >>>
> > > >>>> *## StreamingFileSink not support writer with path*
> > > >>>>
> > > >>>> The FLIP is "Filesystem connector in Table", it's about building
> up
> > > >> Flink
> > > >>>> Table's capabilities. But I think Hive is important, I see that
> most
> > > >> users
> > > >>>> use Flink and Spark to write data from Kafka to hive. Streaming
> > > >> writing, I
> > > >>>> see that these two engines are convenient and popular. I mean,
> Flink
> > > >> is not
> > > >>>> only a hive runtime, but also an important part of offline data
> > > >> warehouse.
> > > >>>> The thing is StreamingFileSink not support hadoop record writers.
> > Yes,
> > > >> we
> > > >>>> can support them one by one. I see the community integrating ORC
> > [1].
> > > >> But
> > > >>>> it's really not an easy thing. And we have to be careful to
> maintain
> > > >>>> compatibility. After all, users downstream use other computing
> > engines
> > > >> to
> > > >>>> analyze.
> > > >>>> Yes, exposing "RecoverableFsDataOutputStream" to writers is good
> to
> > > >>>> subsequent optimization [2]. But there are many cases. It is
> enough
> > > for
> > > >>>> users to generate new files at the checkpoint. They pay more
> > attention
> > > >> to
> > > >>>> whether they can do it and whether there is a risk of
> compatibility.
> > > >>>> Therefore, RecordWriter is used here.
> > > >>>>
> > > >>>> *## External HDFS access*
> > > >>>>
> > > >>>> Including hadoop configuration and Kerberos related things.
> > > >>>>
> > > >>>> *## Partition commit*
> > > >>>>
> > > >>>> Committing a partition is to notify the downstream application
> that
> > > the
> > > >>>> partition has finished writing, the partition is ready to be
> > read.The
> > > >>>> common way is to add a success file or update metastore. Of
> course,
> > > >> there
> > > >>>> are other ways to notify. We need to provide flexible mechanisms.
> > > >>>> As you mentioned, yes, we can extend "StreamingFileSink" for this
> > > part.
> > > >>>>
> > > >>>> *## Batch / Streaming Unification*
> > > >>>>
> > > >>>> Yes, it is about exactly-once and single commit at the end, There
> > are
> > > >> also
> > > >>>> some "bounded" differences. For example, batch can support
> sorting.
> > In
> > > >> this
> > > >>>> way, you can sort by partition, which can reduce the number of
> > writers
> > > >>>> written at the same time. Dynamic partition writing in batch may
> > > >> produce
> > > >>>> many unordered partitions.
> > > >>>>
> > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-10114
> > > >>>> [2] https://issues.apache.org/jira/browse/FLINK-11499
> > > >>>>
> > > >>>> Best,
> > > >>>> Jingsong Lee
> > > >>>>
> > > >>>> On Tue, Mar 17, 2020 at 8:00 PM LakeShen <
> shenleifight...@gmail.com
> > >
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi Jingsong ,
> > > >>>>>
> > > >>>>> I am looking forward this feature. Because in some streaming
> > > >>>> application,it
> > > >>>>> need transfer their messages to hdfs , in order to offline
> > analysis.
> > > >>>>>
> > > >>>>> Best wishes,
> > > >>>>> LakeShen
> > > >>>>>
> > > >>>>> Stephan Ewen <se...@apache.org> 于2020年3月17日周二 下午7:42写道:
> > > >>>>>
> > > >>>>>> I would really like to see us converging the stack and the
> > > >>>> functionality
> > > >>>>>> here.
> > > >>>>>> Meaning to try and use the same sinks in the Table API as for
> the
> > > >>>>>> DataStream API, and using the same sink for batch and streaming.
> > > >>>>>>
> > > >>>>>> The StreamingFileSink has a lot of things that can help with
> that.
> > > >> If
> > > >>>>>> possible, it would be nice to extend it (which would help move
> > > >> towards
> > > >>>>> the
> > > >>>>>> above goal) rather than build a second sink. Building a second
> > sink
> > > >>>> leads
> > > >>>>>> us further away from unification.
> > > >>>>>>
> > > >>>>>> I am a bit puzzled by the statement that sinks are primarily for
> > > >> Hive.
> > > >>>>> The
> > > >>>>>> Table API should not be coupled to Hive, it should be an
> > > >> independent
> > > >>>>>> batch/streaming API for many use cases, supporting very well for
> > > >> batch
> > > >>>>> and
> > > >>>>>> streaming interplay. Supporting Hive is great, but we should not
> > be
> > > >>>>>> building this towards Hive, as just yet another Hive runtime.
> Why
> > > >> "yet
> > > >>>>>> another Hive runtime" when what we have a unique streaming
> engine
> > > >> that
> > > >>>>> can
> > > >>>>>> do much more? We would drop our own strength and reduce
> ourselves
> > > >> to a
> > > >>>>>> limited subset.
> > > >>>>>>
> > > >>>>>> Let's build a File Sink that can also support Hive, but can do
> so
> > > >> much
> > > >>>>>> more. For example, efficient streaming file ingestion as
> > > >> materialized
> > > >>>>> views
> > > >>>>>> from changelogs.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> *## Writing Files in Streaming*
> > > >>>>>>
> > > >>>>>> To write files in streaming, I don't see another way than using
> > the
> > > >>>>>> streaming file sink. If you want to write files across
> > checkpoints,
> > > >>>>> support
> > > >>>>>> exactly-once, and support consistent "stop with savepoint", it
> is
> > > >> not
> > > >>>>>> trivial.
> > > >>>>>>
> > > >>>>>> A part of the complexity comes from the fact that not all
> targets
> > > >> are
> > > >>>>>> actually file systems, and not all have simple semantics for
> > > >>>> persistence.
> > > >>>>>> S3 for example does not support renames (only copies, which may
> > > >> take a
> > > >>>>> lot
> > > >>>>>> of time) and it does not support flush/sync of data (the S3 file
> > > >> system
> > > >>>>> in
> > > >>>>>> Hadoop exposes that but it does not work. flush/sync, followed
> by
> > a
> > > >>>>>> failure, leads to data loss). You need to devise a separate
> > > >> protocol
> > > >>>> for
> > > >>>>>> that, which is exactly what has already been done and abstracted
> > > >> behind
> > > >>>>> the
> > > >>>>>> recoverable writers.
> > > >>>>>>
> > > >>>>>> If you re-engineer that in the, you will end up either missing
> > many
> > > >>>>> things
> > > >>>>>> (intermediate persistence on different file systems, and atomic
> > > >> commit
> > > >>>> in
> > > >>>>>> the absence of renames, etc.), or you end up doing something
> > > >> similar as
> > > >>>>> the
> > > >>>>>> recoverable writers do.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> *## Atomic Commit in Batch*
> > > >>>>>>
> > > >>>>>> For batch sinks, it is also desirable to write the data first
> and
> > > >> then
> > > >>>>>> atomically commit it once the job is done.
> > > >>>>>> Hadoop has spent a lot of time making this work, see this doc
> > here,
> > > >>>>>> specifically the section on 'The "Magic" Committer'. [1]
> > > >>>>>>
> > > >>>>>> What Flink has built in the RecoverableWriter is in some way an
> > > >> even
> > > >>>>> better
> > > >>>>>> version of this, because it works without extra files (we pass
> > data
> > > >>>>> through
> > > >>>>>> checkpoint state) and it supports not only committing once at
> the
> > > >> end,
> > > >>>>> but
> > > >>>>>> committing multiple time intermediate parts during checkpoints.
> > > >>>>>>
> > > >>>>>> Meaning using the recoverable writer mechanism in batch would
> > > >> allow us
> > > >>>> to
> > > >>>>>> immediately get the efficient atomic commit implementations on
> > > >> file://
> > > >>>>>> hdfs:// and s3://, with a well defined way to implement it also
> > for
> > > >>>> other
> > > >>>>>> file systems.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> *## Batch / Streaming Unification*
> > > >>>>>>
> > > >>>>>> It would be great to start looking at these things in the same
> > way:
> > > >>>>>>  - streaming (exactly-once): commits files (after finished) at
> the
> > > >>>> next
> > > >>>>>> checkpoint
> > > >>>>>>  - batch: single commit at the end of the job
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> *## DataStream / Table API Stack Unification*
> > > >>>>>>
> > > >>>>>> Having the same set of capabilities would make it much easier
> for
> > > >> users
> > > >>>>> to
> > > >>>>>> understand the system.
> > > >>>>>> Especially when it comes to consistent behavior across external
> > > >>>> systems.
> > > >>>>>> Having a different file sink in Table API and DataStream API
> means
> > > >> that
> > > >>>>>> DataStream can write correctly to S3 while Table API cannot.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> *## What is missing?*
> > > >>>>>>
> > > >>>>>> It seems there are some things that get in the way of naturally
> > > >>>>>> Can you make a list of what features are missing in the
> > > >>>> StreamingFileSink
> > > >>>>>> that make it usable for the use cases you have in mind?
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Stephan
> > > >>>>>>
> > > >>>>>> [1]
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <
> > > >> jingsongl...@gmail.com>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Piotr,
> > > >>>>>>>
> > > >>>>>>> I am very entangled.
> > > >>>>>>>
> > > >>>>>>> Let me re-list the table streaming sink requirements:
> > > >>>>>>> - In table, maybe 90% sinks are for Hive. The parquet and orc
> > > >> are the
> > > >>>>>> most
> > > >>>>>>> important formats. Hive provide RecordWriters, it is easy to
> > > >> support
> > > >>>>> all
> > > >>>>>>> hive formats by using it, and we don't need concern hive
> version
> > > >>>>>>> compatibility too, but it can not work with FSDataOutputStream.
> > > >>>>>>> - Hive table maybe use external HDFS. It means, hive has its
> own
> > > >>>> hadoop
> > > >>>>>>> configuration.
> > > >>>>>>> - In table, partition commit is needed, we can not just move
> > > >> files,
> > > >>>> it
> > > >>>>> is
> > > >>>>>>> important to complete table semantics to update catalog.
> > > >>>>>>>
> > > >>>>>>> You are right DataStream and Table streaming sink will not be
> > > >> fully
> > > >>>>>>> compatible, each with its own set of limitations, quirks and
> > > >>>> features.
> > > >>>>>>> But if re-using DataStream, batch and streaming also will not
> be
> > > >>>> fully
> > > >>>>>>> compatible. Provide a unify experience to batch and streaming
> is
> > > >> also
> > > >>>>>>> important.
> > > >>>>>>>
> > > >>>>>>> Table and DataStream have different concerns, and they tilt in
> > > >>>>> different
> > > >>>>>>> directions.
> > > >>>>>>>
> > > >>>>>>> Of course, it is very good to see a unify implementation to
> solve
> > > >>>> batch
> > > >>>>>>> sink and hive things, unify DataStream batch sink and
> DataStream
> > > >>>>>> streaming
> > > >>>>>>> sink and Table batch sink and Table streaming sink.
> > > >>>>>>>
> > > >>>>>>> Le's see what others think.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Jingsong Lee
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <
> > > >> pi...@ververica.com>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi Jingsong,
> > > >>>>>>>>
> > > >>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has
> > > >> handled
> > > >>>> the
> > > >>>>>>>> partition and metastore logic well.
> > > >>>>>>>>> - unify batch and streaming
> > > >>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat.
> > > >>>>>>>>> - Add exactly-once related logic. Just 200+ lines code.
> > > >>>>>>>>> - It's natural to support more table features, like partition
> > > >>>>> commit,
> > > >>>>>>>> auto compact and etc..
> > > >>>>>>>>>
> > > >>>>>>>>> Second way is reusing Datastream StreamingFileSink:
> > > >>>>>>>>> - unify streaming sink between table and Datastream.
> > > >>>>>>>>> - It maybe hard to introduce table related features to
> > > >>>>>>> StreamingFileSink.
> > > >>>>>>>>>
> > > >>>>>>>>> I prefer the first way a little. What do you think?
> > > >>>>>>>>
> > > >>>>>>>> I would be surprised if adding “exactly-once related logic” is
> > > >> just
> > > >>>>> 200
> > > >>>>>>>> lines of code. There are things like multi part file upload to
> > > >> s3
> > > >>>> and
> > > >>>>>>> there
> > > >>>>>>>> are also some pending features like [1]. I would suggest to
> > > >>>>> ask/involve
> > > >>>>>>>> Klou in this discussion.
> > > >>>>>>>>
> > > >>>>>>>> If it’s as easy to support exactly-once streaming with current
> > > >>>> batch
> > > >>>>>>> sink,
> > > >>>>>>>> that begs the question, why do we need to maintain
> > > >>>> StreamingFileSink?
> > > >>>>>>>>
> > > >>>>>>>> The worst possible outcome from my perspective will be, if we
> > > >> have
> > > >>>>>>> another
> > > >>>>>>>> example of an operator/logic implemented independently both in
> > > >>>>>> DataStream
> > > >>>>>>>> API and Table API. Because I’m pretty sure they will not be
> > > >> fully
> > > >>>>>>>> compatible, each with it’s own set of limitations, quirks and
> > > >>>>> features.
> > > >>>>>>>> Especially that we have on our long term roadmap and wish list
> > > >> to
> > > >>>>> unify
> > > >>>>>>>> such kind of operators.
> > > >>>>>>>>
> > > >>>>>>>> Piotrek
> > > >>>>>>>>
> > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 <
> > > >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11499>
> > > >>>>>>>>
> > > >>>>>>>>> On 16 Mar 2020, at 06:55, Jingsong Li <
> > > >> jingsongl...@gmail.com>
> > > >>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks Jinhai for involving.
> > > >>>>>>>>>
> > > >>>>>>>>>> we need add 'connector.sink.username' for
> > > >> UserGroupInformation
> > > >>>>> when
> > > >>>>>>> data
> > > >>>>>>>>> is written to HDFS
> > > >>>>>>>>>
> > > >>>>>>>>> Yes, I am not an expert of HDFS, but it seems we need do this
> > > >>>>> "doAs"
> > > >>>>>> in
> > > >>>>>>>> the
> > > >>>>>>>>> code for access external HDFS. I will update document.
> > > >>>>>>>>>
> > > >>>>>>>>> Best,
> > > >>>>>>>>> Jingsong Lee
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <
> > > >>>>> jingsongl...@gmail.com
> > > >>>>>>>
> > > >>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Thanks Piotr and Yun for involving.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hi Piotr and Yun, for implementation,
> > > >>>>>>>>>>
> > > >>>>>>>>>> FLINK-14254 [1] introduce batch sink table world, it deals
> > > >> with
> > > >>>>>>>> partitions
> > > >>>>>>>>>> thing, metastore thing and etc.. And it just reuse
> > > >>>>>> Dataset/Datastream
> > > >>>>>>>>>> FileInputFormat and FileOutputFormat. Filesystem can not do
> > > >>>>> without
> > > >>>>>>>>>> FileInputFormat, because it need deal with file things,
> > > >> split
> > > >>>>>> things.
> > > >>>>>>>> Like
> > > >>>>>>>>>> orc and parquet, they need read whole file and have
> > > >> different
> > > >>>>> split
> > > >>>>>>>> logic.
> > > >>>>>>>>>>
> > > >>>>>>>>>> So back to file system connector:
> > > >>>>>>>>>> - It needs introducing FilesystemTableFactory,
> > > >>>>> FilesystemTableSource
> > > >>>>>>> and
> > > >>>>>>>>>> FilesystemTableSink.
> > > >>>>>>>>>> - For sources, reusing Dataset/Datastream FileInputFormats,
> > > >>>> there
> > > >>>>>> are
> > > >>>>>>> no
> > > >>>>>>>>>> other interface to finish file reading.
> > > >>>>>>>>>>
> > > >>>>>>>>>> For file sinks:
> > > >>>>>>>>>> - Batch sink use FLINK-14254
> > > >>>>>>>>>> - Streaming sink has two ways.
> > > >>>>>>>>>>
> > > >>>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has
> > > >> handled
> > > >>>> the
> > > >>>>>>>>>> partition and metastore logic well.
> > > >>>>>>>>>> - unify batch and streaming
> > > >>>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat.
> > > >>>>>>>>>> - Add exactly-once related logic. Just 200+ lines code.
> > > >>>>>>>>>> - It's natural to support more table features, like
> > > >> partition
> > > >>>>>> commit,
> > > >>>>>>>> auto
> > > >>>>>>>>>> compact and etc..
> > > >>>>>>>>>>
> > > >>>>>>>>>> Second way is reusing Datastream StreamingFileSink:
> > > >>>>>>>>>> - unify streaming sink between table and Datastream.
> > > >>>>>>>>>> - It maybe hard to introduce table related features to
> > > >>>>>>>> StreamingFileSink.
> > > >>>>>>>>>>
> > > >>>>>>>>>> I prefer the first way a little. What do you think?
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hi Yun,
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Watermark mechanism might not be enough.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Watermarks of subtasks are the same in the "snapshotState".
> > > >>>>>>>>>>
> > > >>>>>>>>>>> we might need to also do some coordination between
> > > >> subtasks.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Yes, JobMaster is the role to control subtasks. Metastore
> > > >> is a
> > > >>>>> very
> > > >>>>>>>>>> fragile single point, which can not be accessed by
> > > >> distributed,
> > > >>>> so
> > > >>>>>> it
> > > >>>>>>> is
> > > >>>>>>>>>> uniformly accessed by JobMaster.
> > > >>>>>>>>>>
> > > >>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-14254
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Jingsong Lee
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <
> > > >> yungao...@aliyun.com>
> > > >>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>>      Hi,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>      Very thanks for Jinsong to bring up this discussion!
> > > >> It
> > > >>>>>> should
> > > >>>>>>>>>>> largely improve the usability after enhancing the
> > > >> FileSystem
> > > >>>>>>> connector
> > > >>>>>>>> in
> > > >>>>>>>>>>> Table.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>      I have the same question with Piotr. From my side, I
> > > >>>> think
> > > >>>>> it
> > > >>>>>>>>>>> should be better to be able to reuse existing
> > > >>>> StreamingFileSink.
> > > >>>>> I
> > > >>>>>>>> think We
> > > >>>>>>>>>>> have began
> > > >>>>>>>>>>>      enhancing the supported FileFormat (e.g., ORC,
> > > >> Avro...),
> > > >>>>> and
> > > >>>>>>>>>>> reusing StreamFileSink should be able to avoid repeat work
> > > >> in
> > > >>>> the
> > > >>>>>>> Table
> > > >>>>>>>>>>> library. Besides,
> > > >>>>>>>>>>>      the bucket concept seems also matches the semantics
> > > >> of
> > > >>>>>>> partition.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>      For the notification of adding partitions, I'm a
> > > >> little
> > > >>>>>>> wondering
> > > >>>>>>>>>>> that the Watermark mechanism might not be enough since
> > > >>>>>>> Bucket/Partition
> > > >>>>>>>>>>> might spans
> > > >>>>>>>>>>>      multiple subtasks. It depends on the level of
> > > >>>> notification:
> > > >>>>>> if
> > > >>>>>>> we
> > > >>>>>>>>>>> want to notify for the bucket on each subtask, using
> > > >> watermark
> > > >>>> to
> > > >>>>>>>> notifying
> > > >>>>>>>>>>> each subtask
> > > >>>>>>>>>>>      should be ok, but if we want to notifying for the
> > > >> whole
> > > >>>>>>>>>>> Bucket/Partition, we might need to also do some
> > > >> coordination
> > > >>>>>> between
> > > >>>>>>>>>>> subtasks.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>    Best,
> > > >>>>>>>>>>>     Yun
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>
> ------------------------------------------------------------------
> > > >>>>>>>>>>> From:Piotr Nowojski <pi...@ververica.com>
> > > >>>>>>>>>>> Send Time:2020 Mar. 13 (Fri.) 18:03
> > > >>>>>>>>>>> To:dev <dev@flink.apache.org>
> > > >>>>>>>>>>> Cc:user <u...@flink.apache.org>; user-zh <
> > > >>>>> user...@flink.apache.org
> > > >>>>>>>
> > > >>>>>>>>>>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in
> > > >> Table
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Hi,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Which actual sinks/sources are you planning to use in this
> > > >>>>> feature?
> > > >>>>>>> Is
> > > >>>>>>>> it about exposing StreamingFileSink in the Table API? Or do
> you
> > > >>>> want
> > > >>>>> to
> > > >>>>>>>> implement new Sinks/Sources?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Piotrek
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> On 13 Mar 2020, at 10:04, jinhai wang <
> > > >> jinhai...@gmail.com>
> > > >>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks for FLIP-115. It is really useful feature for
> > > >> platform
> > > >>>>>>>> developers who manage hundreds of Flink to Hive jobs in
> > > >> production.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> I think we need add 'connector.sink.username' for
> > > >>>>>>>> UserGroupInformation when data is written to HDFS
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 在 2020/3/13 下午3:33,“Jingsong Li”<jingsongl...@gmail.com>
> > > >> 写入:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>  Hi everyone,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>>  I'd like to start a discussion about FLIP-115 Filesystem
> > > >>>>>> connector
> > > >>>>>>>> in Table
> > > >>>>>>>>>>>>  [1].
> > > >>>>>>>>>>>>  This FLIP will bring:
> > > >>>>>>>>>>>>  - Introduce Filesystem table factory in table, support
> > > >>>>>>>>>>>>  csv/parquet/orc/json/avro formats.
> > > >>>>>>>>>>>>  - Introduce streaming filesystem/hive sink in table
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>>  CC to user mail list, if you have any unmet needs,
> > > >> please
> > > >>>> feel
> > > >>>>>>> free
> > > >>>>>>>> to
> > > >>>>>>>>>>>>  reply~
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>  Look forward to hearing from you.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>  [1]
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>  Best,
> > > >>>>>>>>>>>>  Jingsong Lee
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> --
> > > >>>>>>>>>> Best, Jingsong Lee
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> --
> > > >>>>>>>>> Best, Jingsong Lee
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> --
> > > >>>>>>> Best, Jingsong Lee
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> Best, Jingsong Lee
> > > >>>>
> > > >>
> > >
> > >
> >
> > --
> > Best, Jingsong Lee
> >
>


-- 
Best, Jingsong Lee

Reply via email to