Re: [DISCUSS] FLIP-250: Support Customized Kubernetes Schedulers Proposal

2022-07-13 Thread Martijn Visser
Hi all,

Thanks for the FLIP. I have a couple of remarks/questions:

* Regarding the motivation, it mentions that the development trend is that
Flink supports both batch and stream processing. I think the vision and
trend is that we have unified batch- and stream processing. What I'm
missing is the vision on what's the impact for customized Kubernetes
schedulers on stream processing. Could there be some elaboration on that?
* While the FLIP talks about customized schedulers, it focuses on Volcano.
Why is the choice made to only focus on Volcano and not on other schedulers
like Apache YuniKorn? Can we not also provide an implementation for
YuniKorn at the same time? Spark did the same with SPARK-36057 [1]
* We still have quite a lot of tech debt on testing for Kubernetes [2]. I
think that this FLIP would be a great improvement for Flink, but I am
worried that we will add more tech debt to those tests. Can we somehow
improve this situation?

Best regards,

Martijn

[1] https://issues.apache.org/jira/browse/SPARK-36057
[2] https://issues.apache.org/jira/browse/FLINK-20392

Op wo 13 jul. 2022 om 04:11 schreef 王正 :

> +1
>
> On 2022/07/07 01:15:13 bo zhaobo wrote:
> > Hi, all.
> >
> > I would like to raise a discussion in Flink dev ML about Support
> Customized
> > Kubernetes Schedulers.
> > Currentlly, Kubernetes becomes more and more polular for Flink Cluster
> > deployment, and its ability is better, especially, it supports
> customized
> > scheduling.
> > Essentially, in high-performance workloads, we need to apply new
> scheduling
> > policies for meeting the new requirements. And now Flink native
> Kubernetes
> > solution is using Kubernetes default scheduler to work with all
> scenarios,
> > the default scheduling policy might be difficult to apply in some extreme
> > cases, so
> > we need to improve the Flink Kubernetes for coupling those Kubernetes
> > customized schedulers with Flink native Kubernetes, provides a way for
> Flink
> > administrators or users to use/apply their Flink Clusters on Kubernetes
> > more flexibility.
> >
> > The proposal will introduce the customized K8S schdulers plugin mechanism
> > and a reference implementation 'Volcano' in Flink. More details see [1].
> >
> > Looking forward to your feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
> >
> > Thanks,
> > BR
> >


Re: [DISCUSS] Add new JobStatus fields to Flink Kubernetes Operator CRD

2022-07-13 Thread Martijn Visser
Hi Daren,

Could you list the benefits for the users of Flink? I do think that an
internal AWS requirement is not a good argument for getting something done
in Flink.

Best regards,

Martijn

Op di 12 jul. 2022 om 21:17 schreef WONG, DAREN
:

> Hi Yang,
>
> The requirement to add *plan* currently originates from an internal AWS
> requirement as our service needs visibility of *plan*, but we think it
> could be beneficial as well to customers who uses *plan* too.
>
> Regards,
> Daren
>
>
>
>
> On 12/07/2022, 13:23, "Yang Wang"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> Thanks for the explanation. Only having 1 API call in most cases makes
> sense to me.
>
> Could you please elaborate more about why do we need the *plan* in CR
> status?
>
>
> Best,
> Yang
>
> Gyula Fóra  于2022年7月12日周二 17:36写道:
>
> > Hi Devs!
> >
> > I discussed with Daren offline, and I agree with him that
> technically we
> > almost never need 2 API calls.
> >
> > I think it's fine to have a second API call once directly after
> application
> > submission (technically even this can be eliminated by setting a fix
> job id
> > always).
> >
> > +1 from me.
> >
> > Cheers,
> > Gyula
> >
> >
> > On Tue, Jul 12, 2022 at 11:32 AM WONG, DAREN
>  > >
> > wrote:
> >
> > > Hi Matyas,
> > >
> > > Thanks for the feedback, and yes I agree. An alternative approach
> would
> > > instead be:
> > >
> > > - 2 API calls only when jobID is not available (i.e when
> submitting a new
> > > application cluster, which is a one-off event).
> > > - 1 API call when jobID is already available by directly calling
> > > "/jobs/:jobid".
> > >
> > > With this approach, we can keep the API call to 1 in most cases.
> > >
> > > Regards,
> > > Daren
> > >
> > >
> > > On 11/07/2022, 14:44, "Őrhidi Mátyás" 
> wrote:
> > >
> > > CAUTION: This email originated from outside of the
> organization. Do
> > > not click links or open attachments unless you can confirm the
> sender and
> > > know the content is safe.
> > >
> > >
> > >
> > > Hi Daren,
> > >
> > > At the moment the Operator fetches the job state via
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-overview
> > > which contains the 'end-time' and 'duration' fields already. I
> feel
> > > calling
> > > the
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
> > > after the previous call for every job in every reconcile loop
> would
> > be
> > > too
> > > expensive.
> > >
> > > Best,
> > > Matyas
> > >
> > > On Mon, Jul 11, 2022 at 3:17 PM WONG, DAREN
> > > 
> > > wrote:
> > >
> > > > Hi everyone, I am Daren from AWS Kinesis Data Analytics
> (KDA) team.
> > > I had
> > > > a quick chat with Gyula as I propose to include a few
> additional
> > > fields in
> > > > the jobStatus CRD for Flink Kubernetes Operator such as:
> > > >
> > > > - endTime
> > > > - duration
> > > > - jobPlan
> > > >
> > > > Further details of each states can be found here<
> > > >
> > >
> >
> https://github.com/darenwkt/flink/blob/release-1.15.0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
> > > >.
> > > > Although addition of these 3 states stem from an internal
> > > requirement, I
> > > > think they would be beneficial to others who uses these
> states in
> > > their
> > > > application as well. The list of states above are not
> exhaustive,
> > so
> > > do let
> > > > me know if there are other states that you would like to
> include
> > > together
> > > > in this iteration cycle.
> > > >
> > > > JIRA: https://issues.apache.org/jira/browse/FLINK-28494
> > > >
> > >
> > >
> >
>
>


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-07-13 Thread Becket Qin
Hi Sebastian,

Thanks for updating the FLIP wiki.

Just to double confirm, I was thinking of a configuration like
"allow.coarse.grained.watermark.alignment". This will allow the coarse
grained watermark alignment as a fallback instead of bubbling up an
exception if split pausing is not supported in some Sources in a Flink job.
And this will only affect the Sources that do not support split pausing,
but not the Sources that have split pausing supported.

This seems slightly different from a  enables / disables split
alignment. This sounds like a global thing, and it seems not necessary to
disable the split alignment, as long as the coarse grained alignment can be
a fallback.

Thanks,

Jiangjie (Becket) Qin

On Wed, Jul 13, 2022 at 2:46 PM Sebastian Mattheis 
wrote:

> Hi Piotrek,
>
> Sorry I've read it and forgot it when I was ripping out the
> supportsPauseOrResume method again. Thanks for pointing that out. I will
> add it as follows: The  enables/disables split alignment in the
> SourceOperator where the default is that split alignment is enabled. (And I
> will add the note: "In future releases, the  may be ignored such that
> split alignment is always enabled.")
>
> Cheers,
> Sebastian
>
> On Tue, Jul 12, 2022 at 11:14 PM Piotr Nowojski 
> wrote:
>
>> Hi Sebastian,
>>
>> Thanks for picking this up.
>>
>> > 5. There is NO configuration option to enable watermark alignment of
>> splits or disable pause/resume capabilities.
>>
>> Isn't this contradicting what we actually agreed on?
>>
>> > we are planning to have a configuration based way to revert to the
>> previous behavior
>>
>> I think what we agreed in the last couple of emails was to add a
>> configuration toggle, that would allow Flink 1.15 users, that are using
>> watermark alignment with multiple splits per source operator, to continue
>> using it with the old 1.15 semantic, even if their source doesn't support
>> pausing/resuming splits. It seems to me like the current FLIP and
>> implementation proposal would always throw an exception in that case?
>>
>> Best,
>> Piotrek
>>
>> wt., 12 lip 2022 o 10:18 Sebastian Mattheis 
>> napisał(a):
>>
>> > Hi all,
>> >
>> > I have updated FLIP-217 [1] to the proposed specification and adapted
>> the
>> > current implementation [2] respectively.
>> >
>> > This means both, FLIP and implementation, are ready for review from my
>> > side. (I would revise commit history and messages for the final PR but
>> left
>> > it as is for now and the records of this discussion.)
>> >
>> > 1. Please review the updated version of FLIP-217 [1]. If there are no
>> > further concerns, I would initiate the voting.
>> > (2. If you want to speed up things, please also have a look into the
>> > updated implementation [2].)
>> >
>> > Please consider the following updated specification in the current
>> status
>> > of FLIP-217 where the essence is as follows:
>> >
>> > 1. A method pauseOrResumeSplits is added to SourceReader with default
>> > implementation that throws UnsupportedOperationException.
>> > 2.  method pauseOrResumeSplits is added to SplitReader with default
>> > implementation that throws UnsupportedOperationException.
>> > 3. SourceOperator initiates split alignment only if more than one split
>> is
>> > assigned to the source (and, of course, only if withSplitAlignment is
>> used).
>> > 4. There is NO "supportsPauseOrResumeSplits" method at any place (to
>> > indicate if the implementation supports pause/resume capabilities).
>> > 5. There is NO configuration option to enable watermark alignment of
>> > splits or disable pause/resume capabilities.
>> >
>> > *Note:* If the SourceReader or some SplitReader do not override
>> > pauseOrResumeSplits but it is required in the application, an exception
>> is
>> > thrown at runtime when an split alignment attempt is executed (not
>> during
>> > startup or any time earlier).
>> >
>> > Also, I have revised the compatibility/migration section to describe a
>> bit
>> > of a rationale for the default implementation with exception throwing
>> > behavior.
>> >
>> > Regards,
>> > Sebastian
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-217+Support+watermark+alignment+of+source+splits
>> > [2] https://github.com/smattheis/flink/tree/flip-217-split-wm-alignment
>> >
>> > On Mon, Jul 4, 2022 at 3:59 AM Thomas Weise  wrote:
>> >
>> >> Hi,
>> >>
>> >> Thank you Becket and Piotr for ironing out the "case 2" behavior.
>> >> Strictly speaking we are introducing a regression by allowing an
>> >> exception to bubble up that did not exist in the previous release,
>> >> regardless how suboptimal the behavior may be. However, given that the
>> >> feature is still experimental and we are planning to have a
>> >> configuration based way to revert to the previous behavior, I think
>> >> this is a good solution.
>> >>
>> >> +1 from my side
>> >>
>> >> Thomas
>> >>
>> >> On Wed, Jun 29, 2022 at 2:43 PM Piotr Nowojski 
>> >> wrote:
>> >> >
>> >> > +1 :)
>> >> >
>> >> > śr.

Re: Not able to run a simple table API file streaming sink

2022-07-13 Thread Martijn Visser
Hi all,

For completeness, for this issue a ticket has been created at
https://issues.apache.org/jira/browse/FLINK-28513

Best regards,

Martijn

Op di 12 jul. 2022 om 03:49 schreef Jaya Ananthram
:

> Hello There,
>
> I am trying to write a simple table API S3 streaming sink using flink
> 1.15.1 and I am facing the following exception,
>
> Caused by: org.apache.flink.util.SerializedThrowable:
> S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to
> create a persistent recoverable intermediate point.
> at
>
> org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:111)
> ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:129)
> ~[flink-s3-fs-hadoop-1.15.1.jar:1.15.1]
> at
> org.apache.flink.formats.csv.CsvBulkWriter.finish(CsvBulkWriter.java:110)
> ~[flink-csv-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.connector.file.table.FileSystemTableSink$ProjectionBulkFactory$1.finish(FileSystemTableSink.java:642)
> ~[flink-connector-files-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:64)
> ~[flink-file-sink-common-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:263)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:305)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:277)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:270)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:261)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(StreamingFileSinkHelper.java:87)
> ~[flink-streaming-java-1.15.1.jar:1.15.1]
> at
>
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(AbstractStreamingWriter.java:129)
> ~[flink-connector-files-1.15.1.jar:1.15.1]
>
> In my config, I am trying to read from Kafka and write to S3 (s3a) using
> table API and checkpoint configuration using s3p (presto). Even I tried
> with a simple datagen example instead of Kafka and I am getting the same
> issue. I think I am following all the exact steps mentioned in the docs and
> the above exceptions are not much helpful. Exactly it is failing when the
> code triggers the checkpoint but I don't have any clue after this. Could
> someone please help me to understand what I am missing here? I don't find
> any open issue with such logs.
>
> Hoping for your reply.
>
> --
>
>
> *This email was
> sent by a company owned by Financial Times Group Limited
> ("FT Group "), registered office
> at Bracken House, One Friday Street, London, EC4M 9BT. Registered in
> England and Wales with company
> number 879531. This e-mail may contain
> confidential information. If you
> are not the intended recipient, please
> notify the sender immediately, delete
> all copies and do not distribute it
> further.  It could* *also
> contain personal views which are not necessarily
> those of the FT Group.
> We may monitor outgoing or incoming emails as
> permitted by law.*
>


Re: [DISCUSS] FLIP-251: Support collecting arbitrary number of streams

2022-07-13 Thread Martijn Visser
+1

Op zo 10 jul. 2022 om 20:30 schreef Konstantin Knauf :

> Makes sense to me. Thank you, Chesnay. +1
>
> David Anderson  schrieb am Fr., 8. Juli 2022, 13:56:
>
> > I've found that with our current tooling it's frustrating to try to write
> > good end-to-end tests for real-world jobs with multiple sinks.
> > DataStream#executeAndCollect() is okay for simple pipelines with one
> sink,
> > but in my opinion we do need something like FLIP-251.
> >
> > The proposed interface looks good to me. I look forward to trying it.
> >
> > Chesnay, thanks for putting this together!
> >
> > David
> >
> > On Thu, Jul 7, 2022 at 7:35 AM Chesnay Schepler 
> > wrote:
> >
> > > Hello,
> > >
> > > I have created a FLIP for slightly extending the collect() API to
> > > support collecting an arbitrary number of streams, to make it more
> > > useful for testing complex workflows.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-251%3A+Support+collecting+arbitrary+number+of+streams
> > >
> > > Looking forward to any feedback.
> > >
> > > Regards,
> > >
> > > Chesnay
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-243: Dedicated Opensearch connectors

2022-07-13 Thread Martijn Visser
Hi Andriy,

Thanks for that. If there's no more comments I think you can open up a vote
thread.

Best regards,

Martijn

Op ma 4 jul. 2022 om 18:32 schreef Andriy Redko :

> Hi Martijn,
>
> Thanks a lot supporting the FLIP! Regarding your question, yes,
> Elasticsearch
> has a new client in v8 (deprecating HighLevelRestClient from now on) and,
> no,
> there won't be any impact on Opensearch (the connectors use Opensearch's
> own HighLevelRestClient, no deprecations yet).
>
> Thank you.
>
> Best Regards,
> Andriy Redko
>
> MV> Hi Andriy,
>
> MV> Thanks for creating the FLIP and opening the discussion. In general +1
> for
> MV> the FLIP, looking forward!
>
> MV> One question: I recall that in order for Flink to support
> Elasticsearch 8,
> MV> the connector needs to be overhauled to use Elastic's new client
> because
> MV> only the new client has a compatible license. Will that impact the
> MV> Opensearch connector in any way?
>
> MV> Best regards,
>
> MV> Martijn
>
> MV> Op wo 22 jun. 2022 om 22:59 schreef Andriy Redko :
>
> >> Hi Folks,
> >>
> >> We would like to start a discussion thread on FLIP-243: Dedicated
> >> Opensearch connectors [1], [2] where we propose to provide dedicated
> >> connectors for Opensearch [3], on par with existing Elasticsearch ones.
> >> Looking forward to comments and feedback. Thank you.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-243%3A+Dedicated+Opensearch+connectors
> >> [2] https://github.com/apache/flink/pull/18541
> >> [3] https://opensearch.org/
> >>
> >> Best Regards,
> >> Andriy Redko
> >>
> >>
>
>


[jira] [Created] (FLINK-28533) SchemaChange supports updateColumnNullability and updateColumnComment

2022-07-13 Thread Jane Chan (Jira)
Jane Chan created FLINK-28533:
-

 Summary: SchemaChange supports updateColumnNullability and 
updateColumnComment
 Key: FLINK-28533
 URL: https://issues.apache.org/jira/browse/FLINK-28533
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.2.0
Reporter: Jane Chan
 Fix For: table-store-0.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28534) Spec change event is triggered twice per upgrade

2022-07-13 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-28534:
--

 Summary: Spec change event is triggered twice per upgrade
 Key: FLINK-28534
 URL: https://issues.apache.org/jira/browse/FLINK-28534
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.1.0
Reporter: Gyula Fora
Assignee: Matyas Orhidi
 Fix For: kubernetes-operator-1.1.0


The event `Detected spec change, starting reconciliation.` is triggered twice 
during every application/sessionjob upgrade as we first suspend the job then 
start it in 2 reconcile steps.

We should only trigger this once



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-07-13 Thread Sebastian Mattheis
Hi Becket, Hi Thomas, Hi Piotrek,

Thanks for the feedback. I would like to highlight some concerns:

   1. Major: A configuration parameter like "allow coarse grained
   alignment" defines a semantic that mixes two contexts conditionally as
   follows: "ignore incapability to pause splits in SourceReader/SplitReader"
   IF (conditional) we "allow coarse grained watermark alignment". At the same
   time we said that there is no way to check the capability of
   SourceReader/SplitReader to pause/resume other than observing a
   UnsupportedOperationException during runtime such that we cannot disable
   the trigger for watermark split alignment in the SourceOperator. Instead,
   we can only ignore the incapability of SourceReader/SplitReader during
   execution of a pause/resume attempt which, consequently, requires to check
   the "allow coarse grained alignment " parameter value (to implement the
   conditional semantic). However, during this execution we actually don't
   know whether the attempt was executed for the purpose of watermark
   alignment or for some other purpose such that the check actually depends on
   who triggered the pause/resume attempt and hides the exception potentially
   unexpectedly for some other use case. Of course, currently there is no
   other purpose and, hence, no other trigger than watermark alignment.
   However, this breaks, in my perspective, the idea of having
   pauseOrResumeSplits (re)usable for other use cases.
   2. Minor: I'm not aware of any configuration parameter in the format
   like `allow.*` as you suggested with
   `allow.coarse.grained.watermark.alignment`. Would that still be okay to do?

As we have agreed to not have a "supportsPausableSplits" method because of
potential inconsistencies between return value of this method and the
actual implementation (and also the difficulty to have a meaningful return
value where the support actually depends on SourceReader AND the assigned
SplitReaders), I don't want to bring up the discussion about the
"supportsPauseableSplits" method again. Instead, I see the following
options:

Option A: I would drop the idea of "allow coarse grained alignment"
semantic of the parameter but implement a parameter to "enable/disable
split watermark alignment" which we can easily use in the SourceOperator to
disable the trigger of split alignment. This is indeed more static and less
flexible, because it disables split alignment unconditionally, but it is
"context-decoupled" and more straight-forward to use. This would also
address the use case of disabling split alignment for the purpose of
runtime behavior evaluation, as mentioned by Thomas (if I remember
correctly.) I would implement the parameter with a default where watermark
split alignment is enabled which requires users to check their application
when upgrading to 1.16 if a) there is a source that reads from multiple
splits and b), if yes, all splits of that source support pause/resume. If
a) yes and b) no, the user must take action to disable watermark split
alignment (which disables the trigger of split alignment only for the
purpose).

Option B: If we ignore my concern, I would simply check the "allow coarse
grained watermark alignment" parameter value on every attempt to execute
pause/resume in the SourceReader and in the SplitReader and will not throw
UnsupportedOperationException if the parameter value is set to true.

Please note that the parameter is also used only for some kind of migration
phase. Therefore, I wonder if we need to overcomplicate things.

@Piotrek, @Becket, @Thomas: I would recommend/favour option A. Please let
me know your feedback and/or concerns as soon as possible, if possible. :)

Regards,
Sebastian


On Wed, Jul 13, 2022 at 9:37 AM Becket Qin  wrote:

> Hi Sebastian,
>
> Thanks for updating the FLIP wiki.
>
> Just to double confirm, I was thinking of a configuration like
> "allow.coarse.grained.watermark.alignment". This will allow the coarse
> grained watermark alignment as a fallback instead of bubbling up an
> exception if split pausing is not supported in some Sources in a Flink job.
> And this will only affect the Sources that do not support split pausing,
> but not the Sources that have split pausing supported.
>
> This seems slightly different from a  enables / disables split
> alignment. This sounds like a global thing, and it seems not necessary to
> disable the split alignment, as long as the coarse grained alignment can be
> a fallback.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Jul 13, 2022 at 2:46 PM Sebastian Mattheis <
> sebast...@ververica.com> wrote:
>
>> Hi Piotrek,
>>
>> Sorry I've read it and forgot it when I was ripping out the
>> supportsPauseOrResume method again. Thanks for pointing that out. I will
>> add it as follows: The  enables/disables split alignment in the
>> SourceOperator where the default is that split alignment is enabled. (And I
>> will add the note: "In future releases, the  may be ignored such that
>>

[jira] [Created] (FLINK-28535) Support create database/table for SparkCatalog

2022-07-13 Thread Jane Chan (Jira)
Jane Chan created FLINK-28535:
-

 Summary: Support create database/table for SparkCatalog
 Key: FLINK-28535
 URL: https://issues.apache.org/jira/browse/FLINK-28535
 Project: Flink
  Issue Type: New Feature
  Components: Table Store
Affects Versions: table-store-0.2.0
Reporter: Jane Chan
 Fix For: table-store-0.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28536) Adds an internal abstraction `PhysicalDagProcessor` for physical plan processing

2022-07-13 Thread lincoln lee (Jira)
lincoln lee created FLINK-28536:
---

 Summary: Adds an internal abstraction `PhysicalDagProcessor` for 
physical plan processing
 Key: FLINK-28536
 URL: https://issues.apache.org/jira/browse/FLINK-28536
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: lincoln lee
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28537) OffsetsInitializer.timestamp() fails if no element exceeds timestamps

2022-07-13 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-28537:


 Summary: OffsetsInitializer.timestamp() fails if no element 
exceeds timestamps
 Key: FLINK-28537
 URL: https://issues.apache.org/jira/browse/FLINK-28537
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Chesnay Schepler


According to a comment in the TimestampOffsetInitializer, if the configured 
timestamp is not found in Kafka, we just use the latest offset instead.
{code:java}
// First get the current end offsets of the partitions. This is going to be used
// in case we cannot find a suitable offsets based on the timestamp, i.e. the 
message
// meeting the requirement of the timestamp have not been produced to Kafka 
yet, in
// this case, we just use the latest offset. {code}
However in practice an exception is thrown in this case.

This is rather unfortunate because it would allow you to easily define a 
time-based bound for testing.

To reproduce, modify

{\{OffsetsInitializerTest#testTimestampOffsetsInitializer}} to use a 
significantly larger timestamp (100k+).
{code:java}
java.lang.IllegalArgumentException: Invalid negative offset
        at 
org.apache.kafka.clients.consumer.OffsetAndTimestamp.(OffsetAndTimestamp.java:36)
        at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.lambda$offsetsForTimes$8(KafkaSourceEnumerator.java:622)
        at 
java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:178)
        at 
java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
        at 
java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1746)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator$PartitionOffsetsRetrieverImpl.offsetsForTimes(KafkaSourceEnumerator.java:615)
        at 
org.apache.flink.connector.kafka.source.enumerator.initializer.TimestampOffsetsInitializer.getPartitionOffsets(TimestampOffsetsInitializer.java:57)
        at 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerTest.testTimestampOffsetsInitializer(OffsetsInitializerTest.java:104)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
        at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
        at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.

Re: [DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-13 Thread Jing Zhang
Hi Gen,

> The way the speculative executions are presented should be almost the
same as the
job was running. Users can still find the executions folded in the subtask
list page.

It's a more complicated operation to check all vertex and all subtasks list
page.
It's better to have an easier way to know whether the job contains
speculative executions
even after the job finished.
Maybe the point could be took into consideration in the next version.

Best,
Jing Zhang


Gen Luo  于2022年7月13日周三 14:47写道:

> Hi Jing,
>
> Thanks for joining the discussion. It's a very good point to figure out the
> possible influence on the history server.
>
> > 1. Does the improvement also cover history server or just Web UI?
> As far as I know most Web UI components are shared between
> runtime and history server, so the improvement is expected to cover both.
>
> We will make sure the changes proposed in this FLIP do not conflict with
> the ongoing FLIP-241 which is working on the enhancement of completed
> job information.
>
> > 2. How to know whether the job contains speculative execution
> instances after the job finished? Do we have to check each subtasks
> of all vertex one by one?
>
> When one attempt of a subtask finishes, all other concurrent attempts
> will be canceled, but still treated as the current executions. The way the
> speculative executions are presented should be almost the same as the
> job was running. Users can still find the executions folded in the subtask
> list page.
>
> As we mentioned in the FLIP, all changes are expected to be transparent
> to users who don't use speculative execution. And to users who do use
> speculative execution, the experience should be almost the same
> when watching a running job or a completed job in the history server.
>
> Best,
> Gen
>
> On Tue, Jul 12, 2022 at 8:41 PM Jing Zhang  wrote:
>
> > Thanks for driving this discussion. It's a very helpful improvement.
> > I only have two minor questions:
> > 1. Does the improvement also cover history server or just Web UI?
> > 2. How to know whether the job contains speculative execution instances
> > after the job finished?
> > Do we have to check each subtasks of all vertex one by one?
> >
> > Best,
> > Jing Zhang
> >
> > Gen Luo  于2022年7月11日周一 22:31写道:
> >
> > > Hi, everyone.
> > >
> > > Thanks for your feedback.
> > > If there are no more concerns or comments, I will start the vote
> > tomorrow.
> > >
> > > Gen Luo  于 2022年7月11日周一 11:12写道:
> > >
> > > > Hi Lijie and Zhu,
> > > >
> > > > Thanks for the suggestion. I agree that the name "Blocked Free Slots"
> > is
> > > > more clear to users.
> > > > I'll take the suggestion and update the FLIP.
> > > >
> > > > On Fri, Jul 8, 2022 at 9:12 PM Zhu Zhu  wrote:
> > > >
> > > >> I agree that it can be more useful to show the number of slots that
> > are
> > > >> free but blocked. Currently users infer the slots in use by
> > subtracting
> > > >> available slots from the total slots. With blocked slots introduced,
> > > this
> > > >> can be achieved by subtracting available slots and blocked free
> slots
> > > >> from the total slots.
> > > >>
> > > >> Therefore, +1 to show "Blocked Free Slots" on the resource card.
> > > >>
> > > >> Thanks,
> > > >> Zhu
> > > >>
> > > >> Lijie Wang  于2022年7月8日周五 17:39写道:
> > > >> >
> > > >> > Hi Gen & Zhu,
> > > >> >
> > > >> > -> 1. Can we also show "Blocked Slots" in the resource card, so
> that
> > > >> users
> > > >> > can easily figure out how many slots are available/blocked/in-use?
> > > >> >
> > > >> > I think we should describe the "available" and "blocked" more
> > clearly.
> > > >> In
> > > >> > my opinion, I think users should be interested in the number of
> > slots
> > > in
> > > >> > the following 3 state:
> > > >> > 1. free and unblocked, I think it's OK to call this state
> > "available".
> > > >> > 2. free and blocked, I think it's not appropriate to call
> "blocked"
> > > >> > directly, because "blocked" should include both the "free and
> > blocked"
> > > >> and
> > > >> > "in-use and blocked".
> > > >> > 3. in-use
> > > >> >
> > > >> > And the sum of the aboved 3 kind of slots should be the total
> number
> > > of
> > > >> > slots in this cluster.
> > > >> >
> > > >> > WDYT?
> > > >> >
> > > >> > Best,
> > > >> > Lijie
> > > >> >
> > > >> > Gen Luo  于2022年7月8日周五 16:14写道:
> > > >> >
> > > >> > > Hi Zhu,
> > > >> > > Thanks for the feedback!
> > > >> > >
> > > >> > > 1.Good idea. Users should be more familiar with the slots as the
> > > >> resource
> > > >> > > units.
> > > >> > >
> > > >> > > 2.You remind me that the "speculative attempts" are execution
> > > attempts
> > > >> > > started by the SpeculativeScheduler when slot tasks are
> detected,
> > > >> while the
> > > >> > > current execution attempts other than the "most current" one are
> > not
> > > >> really
> > > >> > > the speculative attempts. I agree we should modify the field
> name.
> > > >> > >
> > > >> > > 3.ArchivedSpeculativeExecutionVertex seems to be introduced

Re: [VOTE] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-13 Thread Jing Zhang
+1 (binding)

Best,
Jing Zhang

Gen Luo  于2022年7月13日周三 14:49写道:

> Hi Jing,
>
> I have replied in the discussion thread about the questions. Hope that
> would be helpful.
>
> Best,
> Gen
>
> On Tue, Jul 12, 2022 at 8:43 PM Jing Zhang  wrote:
>
> > Hi, Gen Luo,
> >
> > I left  two minor questions in the DISCUSS thread.
> > Sorry for jumping into the discussion so late.
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang  于2022年7月12日周二 19:29写道:
> >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Lijie
> > >
> > > Zhu Zhu  于2022年7月12日周二 17:38写道:
> > >
> > > > +1 (binding)
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Gen Luo  于2022年7月12日周二 13:46写道:
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > >
> > > > > Thanks for all the feedback so far. Based on the discussion [1], we
> > > seem
> > > > to
> > > > > have consensus. So, I would like to start a vote on FLIP-249 [2].
> > > > >
> > > > >
> > > > > The vote will last for at least 72 hours unless there is an
> objection
> > > or
> > > > > insufficient votes.
> > > > >
> > > > >
> > > > > [1]
> https://lists.apache.org/thread/832tk3zvysg45vrqrv5tgbdqx974pm3m
> > > > > [2]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-249%3A+Flink+Web+UI+Enhancement+for+Speculative+Execution
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-243: Dedicated Opensearch connectors

2022-07-13 Thread Andriy Redko
Hi Martijn,

Awesome, will do that shortly, thank you!

Best Regards,
Andriy Redko

> Hi Andriy,

> Thanks for that. If there's no more comments I think you can open up a vote 
> thread.

> Best regards,

> Martijn

> Op ma 4 jul. 2022 om 18:32 schreef Andriy Redko :

>> Hi Martijn,

>> Thanks a lot supporting the FLIP! Regarding your question, yes, Elasticsearch
>> has a new client in v8 (deprecating HighLevelRestClient from now on) and, 
>> no, 
>> there won't be any impact on Opensearch (the connectors use Opensearch's
>> own HighLevelRestClient, no deprecations yet). 

>> Thank you.

>> Best Regards,
>>     Andriy Redko

MV>>> Hi Andriy,

MV>>> Thanks for creating the FLIP and opening the discussion. In general +1 for
MV>>> the FLIP, looking forward!

MV>>> One question: I recall that in order for Flink to support Elasticsearch 8,
MV>>> the connector needs to be overhauled to use Elastic's new client because
MV>>> only the new client has a compatible license. Will that impact the
MV>>> Opensearch connector in any way?

MV>>> Best regards,

MV>>> Martijn

MV>>> Op wo 22 jun. 2022 om 22:59 schreef Andriy Redko :

 Hi Folks,

 We would like to start a discussion thread on FLIP-243: Dedicated
 Opensearch connectors [1], [2] where we propose to provide dedicated
 connectors for Opensearch [3], on par with existing Elasticsearch ones.
 Looking forward to comments and feedback. Thank you.

 [1]
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-243%3A+Dedicated+Opensearch+connectors
 [2] https://github.com/apache/flink/pull/18541
 [3] https://opensearch.org/

 Best Regards,
     Andriy Redko





[REQUEST] Edit Permissions for FLIP

2022-07-13 Thread Steve Yurong Su
Hi folks!

My name is Steve Su, and I am currently a third-year master’s student
at Tsinghua University.

I am now participating in a Flink-related student project in OSPP
(Open Source Promotion Plan[1]). The project aims to design and
implement the generic rate limiter for the FLIP-27 source and apply
the rate-limiting feature to the Flink CDC project. @Leonard Xu is my
mentor in this project.

We already had many discussions on source rate-limiting on the mailing
list, such as [2], [3], and so on. Leonard and I had an offline
discussion, and we thought that it would be great to have a generic
rate limiter, so I’d like to raise a new FLIP to illustrate the design
further.

Please grant my account FLIP edit permissions. My apache confluence
account id is `steveyurongsu`. :D

Thanks,
Steve Su
---
[1] https://summer-ospp.ac.cn/homepagy
[2]https://lists.apache.org/thread/ff6mcos8g4otnhjrp030lrcrf3omgfmf
[3] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt


Re: [DISCUSS] FLIP-248: Introduce dynamic partition pruning

2022-07-13 Thread Jing Zhang
Hi Godfrey,
Thanks for driving this discussion.
This is an important improvement for batch sql jobs.
I agree with Jingsong to expand the capability to more than just partitions.
Besides, I have two points:
1. Based on FLIP-248[1],

> Dynamic partition pruning mechanism can improve performance by avoiding
> reading large amounts of irrelevant data, and it works for both batch and
> streaming queries.

Does DPP also support streaming queries?
It seems the proposed changes in the FLIP-248 does not work for streaming
queries,
because the dimension table might be an unbounded inputs.
Or does it require all dimension tables to be bounded inputs for streaming
jobs if the job wanna enable DPP?

2. I notice there are changes on SplitEnumerator for Hive source and File
source.
And they now depend on SourceEvent to pass PartitionData.
In FLIP-245, if enable speculative execution for sources based on FLIP-27
which use SourceEvent,
it requires the SplitEnumerator must implements new introduced
`SupportsHandleExecutionAttemptSourceEvent` interface,
otherwise an exception would be thrown out.
Since hive and File sources are commonly used for batch jobs, it's better
to take this point into consideration.

Best,
Jing Zhang

[1] FLIP-248:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
[2] FLIP-245:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job


Jark Wu  于2022年7月12日周二 13:16写道:

> I agree with Jingsong. DPP is a particular case of Dynamic Filter Pushdown
> that the join key contains partition fields.  Extending this FLIP to
> general filter
> pushdown can benefit more optimizations, and they can share the same
> interface.
>
> For example, Trino Hive Connector leverages dynamic filtering to support:
> - dynamic partition pruning for partitioned tables
> - and dynamic bucket pruning for bucket tables
> - and dynamic filter pushed into the ORC and Parquet readers to perform
> stripe
>   or row-group pruning and save on disk I/O.
>
> Therefore, +1 to extend this FLIP to Dynamic Filter Pushdown (or Dynamic
> Filtering),
> just like Trino [1].  The interfaces should also be adapted for that.
>
> Besides, maybe the FLIP should also demonstrate the EXPLAIN result, which
> is also an API.
>
> Best,
> Jark
>
> [1]: https://trino.io/docs/current/admin/dynamic-filtering.html
>
>
>
>
>
>
>
>
>
>
> On Tue, 12 Jul 2022 at 09:59, Jingsong Li  wrote:
>
> > Thanks Godfrey for driving.
> >
> > I like this FLIP.
> >
> > We can restrict this capability to more than just partitions.
> > Here are some inputs from Lake Storage.
> >
> > The format of the splits generated by Lake Storage is roughly as follows:
> > Split {
> >Path filePath;
> >Statistics[] fieldStats;
> > }
> >
> > Stats contain the min and max of each column.
> >
> > If the storage is sorted by a column, this means that the split
> > filtering on that column will be very good, so not only the partition
> > field, but also this column is worthy of being pushed down the
> > RuntimeFilter.
> > This information can only be known by source, so I suggest that source
> > return which fields are worthy of being pushed down.
> >
> > My overall point is:
> > This FLIP can be extended to support Source Runtime Filter push-down
> > for all fields, not just dynamic partition pruning.
> >
> > What do you think?
> >
> > Best,
> > Jingsong
> >
> > On Fri, Jul 8, 2022 at 10:12 PM godfrey he  wrote:
> > >
> > > Hi all,
> > >
> > > I would like to open a discussion on FLIP-248: Introduce dynamic
> > > partition pruning.
> > >
> > >  Currently, Flink supports static partition pruning: the conditions in
> > > the WHERE clause are analyzed
> > > to determine in advance which partitions can be safely skipped in the
> > > optimization phase.
> > > Another common scenario: the partitions information is not available
> > > in the optimization phase but in the execution phase.
> > > That's the problem this FLIP is trying to solve: dynamic partition
> > > pruning, which could reduce the partition table source IO.
> > >
> > > The query pattern looks like:
> > > select * from store_returns, date_dim where sr_returned_date_sk =
> > > d_date_sk and d_year = 2000
> > >
> > > We will introduce a mechanism for detecting dynamic partition pruning
> > > patterns in optimization phase
> > > and performing partition pruning at runtime by sending the dimension
> > > table results to the SplitEnumerator
> > > of fact table via existing coordinator mechanism.
> > >
> > > You can find more details in FLIP-248 document[1].
> > > Looking forward to your any feedback.
> > >
> > > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-248%3A+Introduce+dynamic+partition+pruning
> > > [2] POC: https://github.com/godfreyhe/flink/tree/FLIP-248
> > >
> > >
> > > Best,
> > > Godfrey
> >
>


[jira] [Created] (FLINK-28538) Eagerly emit a watermark the first time we exceed Long.MIN_VALUE

2022-07-13 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-28538:


 Summary: Eagerly emit a watermark the first time we exceed 
Long.MIN_VALUE
 Key: FLINK-28538
 URL: https://issues.apache.org/jira/browse/FLINK-28538
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Chesnay Schepler
 Fix For: 1.16.0


The built-in watermark generators are set up to start with a Long.MIN_VALUE 
watermark, that is updated on each event and periodically emitted later on.

As a result, before the first periodic emission is triggered (shortly after job 
(re)start) no element is ever considered late.

We should be able to remedy that by emitting a watermark on the first event.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28539) Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED

2022-07-13 Thread Usamah Jassat (Jira)
Usamah Jassat created FLINK-28539:
-

 Summary: Enable CompactionDynamicLevelBytes in FLASH_SSD_OPTIMIZED
 Key: FLINK-28539
 URL: https://issues.apache.org/jira/browse/FLINK-28539
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Usamah Jassat


Investigating the RocksDB predefined options I see that 
`setLevelCompactionDynamicLevelBytes` is set for SPINNING_DISK options but not 
FLASH_SSD_OPTIMIZED.

 

>From my research it looks like this change would improve the Space 
>Amplification of RocksDB [1] (which can also lead to a trade-off from 
>read/write amplification [2]). It makes sense to me that this feature should 
>be enabled for SSD's as they tend to have less space compared to their HDD 
>counterparts.

There is also an argument to be made to also disable it for SPINNING_DISK 
options as it could give increased read/write performance [2]

[1] [http://rocksdb.org/blog/2015/07/23/dynamic-level.html]

[2] 
[https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#amplification-factors]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [REQUEST] Edit Permissions for FLIP

2022-07-13 Thread Jark Wu
Hi Steve,

Welcome to the Flink community. I have granted the edit permission to you.
Looking forward to your FLIP.

Best,
Jark

On Wed, 13 Jul 2022 at 19:56, Steve Yurong Su  wrote:

> Hi folks!
>
> My name is Steve Su, and I am currently a third-year master’s student
> at Tsinghua University.
>
> I am now participating in a Flink-related student project in OSPP
> (Open Source Promotion Plan[1]). The project aims to design and
> implement the generic rate limiter for the FLIP-27 source and apply
> the rate-limiting feature to the Flink CDC project. @Leonard Xu is my
> mentor in this project.
>
> We already had many discussions on source rate-limiting on the mailing
> list, such as [2], [3], and so on. Leonard and I had an offline
> discussion, and we thought that it would be great to have a generic
> rate limiter, so I’d like to raise a new FLIP to illustrate the design
> further.
>
> Please grant my account FLIP edit permissions. My apache confluence
> account id is `steveyurongsu`. :D
>
> Thanks,
> Steve Su
> ---
> [1] https://summer-ospp.ac.cn/homepagy
> [2]https://lists.apache.org/thread/ff6mcos8g4otnhjrp030lrcrf3omgfmf
> [3] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt
>


Re: [DISCUSS] FLIP-217 Support watermark alignment of source splits

2022-07-13 Thread Becket Qin
Thanks for the explanation, Sebastian. I understand your concern now.

1. About the major concern. Personally I'd consider the coarse grained
watermark alignment as a special case for backward compatibility. In the
future, if for whatever reason we want to pause a split and that is not
supported, it seems the only thing that makes sense is throwing an
exception, instead of pausing the entire source reader. Regarding this
FLIP, if the logic that determines which split should be paused is in the
SourceOperator, the SourceOperator actually knows the reason why it pauses
a split. It also knows whether there are more than one split assigned to
the source reader. So it can just fallback to the coarse grained watermark
alignment, without affecting other reasons of pausing a split, right? And
in the future, if there are more purposes for pausing / resuming a split,
the SourceOperator still needs to understand each of the reasons in order
to resume the splits after all the pausing conditions are no longer met.

2. Naming wise, would "coarse.grained.watermark.alignment.enabled" address
your concern?

The only concern I have for Option A is that people may not be able to
benefit from split level WM alignment until all the sources they need have
that implemented. This seems unnecessarily delaying the adoption of a new
feature, which looks like a more substantive downside compared with the
"coarse.grained.wm.alignment.enabled" option.

BTW, the SourceOperator doesn't need to invoke the pauseOrResumeSplit()
method and catch the UnsupportedOperation every time. A flag can be set so
it doesn't attempt to pause the split after the first time it sees the
exception.


Thanks,

Jiangjie (Becket) Qin



On Wed, Jul 13, 2022 at 5:11 PM Sebastian Mattheis 
wrote:

> Hi Becket, Hi Thomas, Hi Piotrek,
>
> Thanks for the feedback. I would like to highlight some concerns:
>
>1. Major: A configuration parameter like "allow coarse grained
>alignment" defines a semantic that mixes two contexts conditionally as
>follows: "ignore incapability to pause splits in SourceReader/SplitReader"
>IF (conditional) we "allow coarse grained watermark alignment". At the same
>time we said that there is no way to check the capability of
>SourceReader/SplitReader to pause/resume other than observing a
>UnsupportedOperationException during runtime such that we cannot disable
>the trigger for watermark split alignment in the SourceOperator. Instead,
>we can only ignore the incapability of SourceReader/SplitReader during
>execution of a pause/resume attempt which, consequently, requires to check
>the "allow coarse grained alignment " parameter value (to implement the
>conditional semantic). However, during this execution we actually don't
>know whether the attempt was executed for the purpose of watermark
>alignment or for some other purpose such that the check actually depends on
>who triggered the pause/resume attempt and hides the exception potentially
>unexpectedly for some other use case. Of course, currently there is no
>other purpose and, hence, no other trigger than watermark alignment.
>However, this breaks, in my perspective, the idea of having
>pauseOrResumeSplits (re)usable for other use cases.
>2. Minor: I'm not aware of any configuration parameter in the format
>like `allow.*` as you suggested with
>`allow.coarse.grained.watermark.alignment`. Would that still be okay to do?
>
> As we have agreed to not have a "supportsPausableSplits" method because of
> potential inconsistencies between return value of this method and the
> actual implementation (and also the difficulty to have a meaningful return
> value where the support actually depends on SourceReader AND the assigned
> SplitReaders), I don't want to bring up the discussion about the
> "supportsPauseableSplits" method again. Instead, I see the following
> options:
>
> Option A: I would drop the idea of "allow coarse grained alignment"
> semantic of the parameter but implement a parameter to "enable/disable
> split watermark alignment" which we can easily use in the SourceOperator to
> disable the trigger of split alignment. This is indeed more static and less
> flexible, because it disables split alignment unconditionally, but it is
> "context-decoupled" and more straight-forward to use. This would also
> address the use case of disabling split alignment for the purpose of
> runtime behavior evaluation, as mentioned by Thomas (if I remember
> correctly.) I would implement the parameter with a default where watermark
> split alignment is enabled which requires users to check their application
> when upgrading to 1.16 if a) there is a source that reads from multiple
> splits and b), if yes, all splits of that source support pause/resume. If
> a) yes and b) no, the user must take action to disable watermark split
> alignment (which disables the trigger of split alignment only for the
> purpose).
>
> Opti

[jira] [Created] (FLINK-28540) Unaligned checkpoint waiting in 'start delay' with AsyncDataStream

2022-07-13 Thread Nathan P Sharp (Jira)
Nathan P Sharp created FLINK-28540:
--

 Summary: Unaligned checkpoint waiting in 'start delay' with 
AsyncDataStream
 Key: FLINK-28540
 URL: https://issues.apache.org/jira/browse/FLINK-28540
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
 Environment: Flink 1.15.0 using default options from 
[https://hub.docker.com/_/flink]

 
Reporter: Nathan P Sharp
 Attachments: SingleCheckpointLongStartDelay.png

I am attempting to use unaligned checkpointing with AsyncDataStream, but the 
checkpoints sit in "start delay" until the job finishes.

I have published code that reproduces this to 
[https://github.com/phxnsharp/AsyncDataStreamCheckpointReproduction]

Reproduction steps:
 * Create a single node Docker swarm.
 * Run the docker-compose.yml file in the repository:
docker stack up -c docker-compose.yml flink
 * Use Flink's web UI to upload the .jar file and run it with default settings.

Expected behavior: Checkpoints happen about once per second since they are 
unaligned.

Actual behavior: After some number of failed checkpoints (the tasks are not 
running yet), a single checkpoint sits in "start delay" until the job finishes.

 
 * Searching the web seems to indicate the most common issue is asyncInvoke 
blocking. I added a test in the code to make sure that that is not true.

 * I have tried using rocksdb state backend, which did not help

 * I have tried adding additional TaskWorkers, which did not help

 * I have checked the TaskWorker stats and nothing seems awry. No memory 
consumption, for example. Nothing obvious in the stack traces

 * If I change the code to be sequential instead of async, checkpoints work fine

 * The log file merely shows the checkpoint being triggered, then it being 
completed 47 seconds later. No additional information is logged.

Mailing list conversation: 
[https://lists.apache.org/thread/2y3fb93zfsttq03z11xcnynf10xbpgnn]

Thank you!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28541) Add OwnerReferences to FlinkDeployment CR in jobmanager Deployment

2022-07-13 Thread 陳昌倬
ChangZhuo Chen (陳昌倬) created FLINK-28541:


 Summary: Add OwnerReferences to FlinkDeployment CR in jobmanager 
Deployment
 Key: FLINK-28541
 URL: https://issues.apache.org/jira/browse/FLINK-28541
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: ChangZhuo Chen (陳昌倬)


`ownerReferences` is used by Argo CD 
(https://argo-cd.readthedocs.io/en/stable/) to display relation between 
resources. Since there is no `ownerReferences` in jobmanager Deployment, Argo 
CD cannot know this Deployment is created by FlinkDeployment CR. Thus Argo CD 
cannot display full resources managed by FlinkDeployment CR.

Discuss thread in 
https://apache-flink.slack.com/archives/C03G7LJTS2G/p1657639397473729



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [REQUEST] Edit Permissions for FLIP

2022-07-13 Thread Steve Yurong Su
Hi Jark,

Thanks a lot :)

Steve

Jark Wu  于2022年7月13日周三 22:48写道:
>
> Hi Steve,
>
> Welcome to the Flink community. I have granted the edit permission to you.
> Looking forward to your FLIP.
>
> Best,
> Jark
>
> On Wed, 13 Jul 2022 at 19:56, Steve Yurong Su  wrote:
>
> > Hi folks!
> >
> > My name is Steve Su, and I am currently a third-year master’s student
> > at Tsinghua University.
> >
> > I am now participating in a Flink-related student project in OSPP
> > (Open Source Promotion Plan[1]). The project aims to design and
> > implement the generic rate limiter for the FLIP-27 source and apply
> > the rate-limiting feature to the Flink CDC project. @Leonard Xu is my
> > mentor in this project.
> >
> > We already had many discussions on source rate-limiting on the mailing
> > list, such as [2], [3], and so on. Leonard and I had an offline
> > discussion, and we thought that it would be great to have a generic
> > rate limiter, so I’d like to raise a new FLIP to illustrate the design
> > further.
> >
> > Please grant my account FLIP edit permissions. My apache confluence
> > account id is `steveyurongsu`. :D
> >
> > Thanks,
> > Steve Su
> > ---
> > [1] https://summer-ospp.ac.cn/homepagy
> > [2]https://lists.apache.org/thread/ff6mcos8g4otnhjrp030lrcrf3omgfmf
> > [3] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt
> >


Re: [REQUEST] Edit Permissions for FLIP

2022-07-13 Thread Alexander Fedulov
Hi Steve,

in the FLIP-238 [1] I propose to also introduce a rate limiter with a
simple API at the level of SourceReader:
RateLimiter.java

GuavaRateLimiter.java

RateLimitedSourceReader.java

DataGeneratorSourceV3.java#L140


Did you plan to go further than this?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source

Best,
Alexander Fedulov

On Wed, Jul 13, 2022 at 6:38 PM Steve Yurong Su  wrote:

> Hi Jark,
>
> Thanks a lot :)
>
> Steve
>
> Jark Wu  于2022年7月13日周三 22:48写道:
> >
> > Hi Steve,
> >
> > Welcome to the Flink community. I have granted the edit permission to
> you.
> > Looking forward to your FLIP.
> >
> > Best,
> > Jark
> >
> > On Wed, 13 Jul 2022 at 19:56, Steve Yurong Su  wrote:
> >
> > > Hi folks!
> > >
> > > My name is Steve Su, and I am currently a third-year master’s student
> > > at Tsinghua University.
> > >
> > > I am now participating in a Flink-related student project in OSPP
> > > (Open Source Promotion Plan[1]). The project aims to design and
> > > implement the generic rate limiter for the FLIP-27 source and apply
> > > the rate-limiting feature to the Flink CDC project. @Leonard Xu is my
> > > mentor in this project.
> > >
> > > We already had many discussions on source rate-limiting on the mailing
> > > list, such as [2], [3], and so on. Leonard and I had an offline
> > > discussion, and we thought that it would be great to have a generic
> > > rate limiter, so I’d like to raise a new FLIP to illustrate the design
> > > further.
> > >
> > > Please grant my account FLIP edit permissions. My apache confluence
> > > account id is `steveyurongsu`. :D
> > >
> > > Thanks,
> > > Steve Su
> > > ---
> > > [1] https://summer-ospp.ac.cn/homepagy
> > > [2]https://lists.apache.org/thread/ff6mcos8g4otnhjrp030lrcrf3omgfmf
> > > [3] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt
> > >
>


[VOTE] FLIP-243: Dedicated Opensearch connectors

2022-07-13 Thread Andriy Redko
Hey Folks,

Thanks a lot for all the feedback and comments so far. Based on the discussion 
[1], 
it seems like there is a genuine interest in supporting OpenSearch [2] 
natively. With 
that being said, I would like to start a vote on FLIP-243 [3].

The vote will last for at least 72 hours unless there is an objection or
insufficient votes.

Thank you!

[1] https://lists.apache.org/thread/jls0vqc7jb84jp14j4jok1pqfgo2cl30
[2] https://opensearch.org/
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-243%3A+Dedicated+Opensearch+connectors


Best Regards,
Andriy Redko



[jira] [Created] (FLINK-28542) [JUnit5 Migration] FileSystemBehaviorTestSuite

2022-07-13 Thread Ryan Skraba (Jira)
Ryan Skraba created FLINK-28542:
---

 Summary: [JUnit5 Migration] FileSystemBehaviorTestSuite
 Key: FLINK-28542
 URL: https://issues.apache.org/jira/browse/FLINK-28542
 Project: Flink
  Issue Type: Sub-task
  Components: FileSystems
Reporter: Ryan Skraba


The FileSystemBehaviorTestSuite in flink-core has an implementation in most 
modules in flink-filesystems.  All of these implementations (one for each 
filesystem) should be migrated together.

{color:#00} {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28543) After compiling Flink, you should provide the artifact file in tar format instead of a folder

2022-07-13 Thread MengYao (Jira)
MengYao created FLINK-28543:
---

 Summary: After compiling Flink, you should provide the artifact 
file in tar format instead of a folder
 Key: FLINK-28543
 URL: https://issues.apache.org/jira/browse/FLINK-28543
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.15.1, 1.15.0
Reporter: MengYao


After compiling Flink, under the target directory of the flink-dist module, the 
final binary files are located in the flink-version-bin/flink-version 
directory, but they are not tar files. Therefore, you always have to execute 
commands manually to generate tar packages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28544) Elasticsearch6SinkE2ECase failed with no space left on device

2022-07-13 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-28544:


 Summary: Elasticsearch6SinkE2ECase failed with no space left on 
device
 Key: FLINK-28544
 URL: https://issues.apache.org/jira/browse/FLINK-28544
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch, Tests
Affects Versions: 1.16.0
Reporter: Huang Xingbo



{code:java}
2022-07-13T02:49:13.5455800Z Jul 13 02:49:13 [ERROR] Tests run: 1, Failures: 0, 
Errors: 1, Skipped: 0, Time elapsed: 49.38 s <<< FAILURE! - in 
org.apache.flink.streaming.tests.Elasticsearch6SinkE2ECase
2022-07-13T02:49:13.5465965Z Jul 13 02:49:13 [ERROR] 
org.apache.flink.streaming.tests.Elasticsearch6SinkE2ECase  Time elapsed: 49.38 
s  <<< ERROR!
2022-07-13T02:49:13.5466765Z Jul 13 02:49:13 java.lang.RuntimeException: Failed 
to build JobManager image
2022-07-13T02:49:13.5467621Z Jul 13 02:49:13at 
org.apache.flink.connector.testframe.container.FlinkTestcontainersConfigurator.configureJobManagerContainer(FlinkTestcontainersConfigurator.java:67)
2022-07-13T02:49:13.5468645Z Jul 13 02:49:13at 
org.apache.flink.connector.testframe.container.FlinkTestcontainersConfigurator.configure(FlinkTestcontainersConfigurator.java:147)
2022-07-13T02:49:13.5469564Z Jul 13 02:49:13at 
org.apache.flink.connector.testframe.container.FlinkContainers$Builder.build(FlinkContainers.java:197)
2022-07-13T02:49:13.5470467Z Jul 13 02:49:13at 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment.(FlinkContainerTestEnvironment.java:88)
2022-07-13T02:49:13.5471424Z Jul 13 02:49:13at 
org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment.(FlinkContainerTestEnvironment.java:51)
2022-07-13T02:49:13.5472504Z Jul 13 02:49:13at 
org.apache.flink.streaming.tests.ElasticsearchSinkE2ECaseBase.(ElasticsearchSinkE2ECaseBase.java:58)
2022-07-13T02:49:13.5473388Z Jul 13 02:49:13at 
org.apache.flink.streaming.tests.Elasticsearch6SinkE2ECase.(Elasticsearch6SinkE2ECase.java:36)
2022-07-13T02:49:13.5474161Z Jul 13 02:49:13at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
2022-07-13T02:49:13.5474905Z Jul 13 02:49:13at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
2022-07-13T02:49:13.5475756Z Jul 13 02:49:13at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
2022-07-13T02:49:13.5476734Z Jul 13 02:49:13at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
2022-07-13T02:49:13.5477495Z Jul 13 02:49:13at 
org.junit.platform.commons.util.ReflectionUtils.newInstance(ReflectionUtils.java:550)
2022-07-13T02:49:13.5478313Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.ConstructorInvocation.proceed(ConstructorInvocation.java:56)
2022-07-13T02:49:13.5479220Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
2022-07-13T02:49:13.5480165Z Jul 13 02:49:13at 
org.junit.jupiter.api.extension.InvocationInterceptor.interceptTestClassConstructor(InvocationInterceptor.java:73)
2022-07-13T02:49:13.5481038Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
2022-07-13T02:49:13.5481944Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
2022-07-13T02:49:13.5482875Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
2022-07-13T02:49:13.5483764Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
2022-07-13T02:49:13.5484642Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
2022-07-13T02:49:13.5486123Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
2022-07-13T02:49:13.5488185Z Jul 13 02:49:13at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:77)
2022-07-13T02:49:13.543Z Jul 13 02:49:13at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestClassConstructor(ClassBasedTestDescriptor.java:355)
2022-07-13T02:49:13.5490237Z Jul 13 02:49:13at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateTestClass(ClassBasedTestDescriptor.java:302)
2022-07-13T02:49:13.5491099Z Jul 13 02:49:13at 
org.junit.jupiter.engine.descriptor.ClassTestDescriptor.instantiateTestClass(ClassTestDescriptor.java:79)
2022-07-13T02:49:13.5491840Z Jul 13 02:49:13at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance

[jira] [Created] (FLINK-28545) FlinkKafkaProducerITCase.testRestoreToCheckpointAfterExceedingProducersPool failed with TimeoutException: Topic flink-kafka-producer-fail-before-notify not present in m

2022-07-13 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-28545:


 Summary: 
FlinkKafkaProducerITCase.testRestoreToCheckpointAfterExceedingProducersPool  
failed with TimeoutException: Topic flink-kafka-producer-fail-before-notify not 
present in metadata after 6 ms
 Key: FLINK-28545
 URL: https://issues.apache.org/jira/browse/FLINK-28545
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.1
Reporter: Huang Xingbo



{code:java}
2022-07-13T09:49:00.4699245Z Jul 13 09:49:00 [ERROR] 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase.testRestoreToCheckpointAfterExceedingProducersPool
  Time elapsed: 243.968 s  <<< ERROR!
2022-07-13T09:49:00.4700438Z Jul 13 09:49:00 
org.apache.kafka.common.errors.TimeoutException: Topic 
flink-kafka-producer-fail-before-notify not present in metadata after 6 ms.
2022-07-13T09:49:00.4702497Z Jul 13 09:49:00 
2022-07-13T09:49:00.8707199Z Jul 13 09:49:00 [INFO] 
2022-07-13T09:49:00.8708010Z Jul 13 09:49:00 [INFO] Results:
2022-07-13T09:49:00.8708577Z Jul 13 09:49:00 [INFO] 
2022-07-13T09:49:00.8709134Z Jul 13 09:49:00 [ERROR] Errors: 
2022-07-13T09:49:00.8711253Z Jul 13 09:49:00 [ERROR]   
FlinkKafkaProducerITCase.testRestoreToCheckpointAfterExceedingProducersPool » 
Timeout
2022-07-13T09:49:00.8712471Z Jul 13 09:49:00 [INFO] 
2022-07-13T09:49:00.8713163Z Jul 13 09:49:00 [ERROR] Tests run: 209, Failures: 
0, Errors: 1, Skipped: 1
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=38129&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=15a22db7-8faa-5b34-3920-d33c9f0ca23c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28546) Add the release logic for py39 and m1 wheel package

2022-07-13 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-28546:


 Summary: Add the release logic for py39 and m1 wheel package
 Key: FLINK-28546
 URL: https://issues.apache.org/jira/browse/FLINK-28546
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Release System
Affects Versions: 1.16.0
Reporter: Huang Xingbo
Assignee: Huang Xingbo
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28547) Add IT cases for SpeculativeScheduler

2022-07-13 Thread Lijie Wang (Jira)
Lijie Wang created FLINK-28547:
--

 Summary: Add IT cases for SpeculativeScheduler
 Key: FLINK-28547
 URL: https://issues.apache.org/jira/browse/FLINK-28547
 Project: Flink
  Issue Type: Sub-task
Reporter: Lijie Wang


Add IT cases for SpeculativeScheduler.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-250: Support Customized Kubernetes Schedulers Proposal

2022-07-13 Thread bo zhaobo
Hi Martijn,

Thank you for your comments. I will answer the questions one by one.

""
* Regarding the motivation, it mentions that the development trend is that
Flink supports both batch and stream processing. I think the vision and
trend is that we have unified batch- and stream processing. What I'm
missing is the vision on what's the impact for customized Kubernetes
schedulers on stream processing. Could there be some elaboration on that?
""

>>

We very much agree with you and the dev trend that Flink supports both
batch and stream processing. Actually, using the K8S customized scheduler
is beneficial for streaming scenarios too, such as avoiding resource
deadlock and other problems, for example, the remaining resources in the
K8S cluster are only enough for one job running, but we submitted two. At
this time, both jobs will be prevented and hang from requesting resources
at the same time when using the default K8S scheduler, but in this case,
the customized scheduler Volcano won’t schedule overcommit pods if the idle
can not fit all following pods setup. So the benefits mentioned in FLIP are
not only for batch jobs. In fact, the said 4 scheduling capabilities
mentioned in FLIP are all required for stream processing. YARN has some of
those scheduling features too, such as priority scheduling, min/max
resource constraint and etc...

""
* While the FLIP talks about customized schedulers, it focuses on Volcano.
Why is the choice made to only focus on Volcano and not on other schedulers
like Apache YuniKorn? Can we not also provide an implementation for
YuniKorn at the same time? Spark did the same with SPARK-36057 [1]
""

>>

Let's make it more clear about this. The FLIP consists of two parts:
1. Introducing Flink K8S supports the customized scheduler plugin
mechanism. This aspect is a general consideration.
2. Introducing ONE reference implementation for the customized scheduler,
volcano is just one of them, if other schedulers or people are interested,
the integration of other schedulers can also be easily completed.

""
* We still have quite a lot of tech debt on testing for Kubernetes [2]. I
think that this FLIP would be a great improvement for Flink, but I am
worried that we will add more tech debt to those tests. Can we somehow
improve this situation?
""

>>

Yeah, We will pay attention to the test problems, which are related to
Flink K8S and we are happy to improve it. ;-)

BR,

Bo Zhao

Martijn Visser  于2022年7月13日周三 15:19写道:

> Hi all,
>
> Thanks for the FLIP. I have a couple of remarks/questions:
>
> * Regarding the motivation, it mentions that the development trend is that
> Flink supports both batch and stream processing. I think the vision and
> trend is that we have unified batch- and stream processing. What I'm
> missing is the vision on what's the impact for customized Kubernetes
> schedulers on stream processing. Could there be some elaboration on that?
> * While the FLIP talks about customized schedulers, it focuses on Volcano.
> Why is the choice made to only focus on Volcano and not on other schedulers
> like Apache YuniKorn? Can we not also provide an implementation for
> YuniKorn at the same time? Spark did the same with SPARK-36057 [1]
> * We still have quite a lot of tech debt on testing for Kubernetes [2]. I
> think that this FLIP would be a great improvement for Flink, but I am
> worried that we will add more tech debt to those tests. Can we somehow
> improve this situation?
>
> Best regards,
>
> Martijn
>
> [1] https://issues.apache.org/jira/browse/SPARK-36057
> [2] https://issues.apache.org/jira/browse/FLINK-20392
>
> Op wo 13 jul. 2022 om 04:11 schreef 王正 :
>
> > +1
> >
> > On 2022/07/07 01:15:13 bo zhaobo wrote:
> > > Hi, all.
> > >
> > > I would like to raise a discussion in Flink dev ML about Support
> > Customized
> > > Kubernetes Schedulers.
> > > Currentlly, Kubernetes becomes more and more polular for Flink Cluster
> > > deployment, and its ability is better, especially, it supports
> > customized
> > > scheduling.
> > > Essentially, in high-performance workloads, we need to apply new
> > scheduling
> > > policies for meeting the new requirements. And now Flink native
> > Kubernetes
> > > solution is using Kubernetes default scheduler to work with all
> > scenarios,
> > > the default scheduling policy might be difficult to apply in some
> extreme
> > > cases, so
> > > we need to improve the Flink Kubernetes for coupling those Kubernetes
> > > customized schedulers with Flink native Kubernetes, provides a way for
> > Flink
> > > administrators or users to use/apply their Flink Clusters on Kubernetes
> > > more flexibility.
> > >
> > > The proposal will introduce the customized K8S schdulers plugin
> mechanism
> > > and a reference implementation 'Volcano' in Flink. More details see
> [1].
> > >
> > > Looking forward to your feedback.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-250%3A+Support+Customized+Kubernetes+Schedulers+Proposal
>

Re: [DISCUSS] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-13 Thread Gen Luo
Hi Jing,

I agree that in this version it's not really convenient to check out if
there are
speculative attempts. I think what we need to improve this is not only the
enhancement of the current Web UI, but a new page summarizing the
information of speculative execution on the history server, or both. More
useful information can be found there like the history timeline and
progress
of speculative executions. That can be a big project.

I suppose in the FLIP we'd like to make sure of the usability of
speculative
execution, and we can then consider further improvements after this is done.

Best,
Gen

On Wed, Jul 13, 2022 at 7:05 PM Jing Zhang  wrote:

> Hi Gen,
>
> > The way the speculative executions are presented should be almost the
> same as the
> job was running. Users can still find the executions folded in the subtask
> list page.
>
> It's a more complicated operation to check all vertex and all subtasks list
> page.
> It's better to have an easier way to know whether the job contains
> speculative executions
> even after the job finished.
> Maybe the point could be took into consideration in the next version.
>
> Best,
> Jing Zhang
>
>
> Gen Luo  于2022年7月13日周三 14:47写道:
>
> > Hi Jing,
> >
> > Thanks for joining the discussion. It's a very good point to figure out
> the
> > possible influence on the history server.
> >
> > > 1. Does the improvement also cover history server or just Web UI?
> > As far as I know most Web UI components are shared between
> > runtime and history server, so the improvement is expected to cover both.
> >
> > We will make sure the changes proposed in this FLIP do not conflict with
> > the ongoing FLIP-241 which is working on the enhancement of completed
> > job information.
> >
> > > 2. How to know whether the job contains speculative execution
> > instances after the job finished? Do we have to check each subtasks
> > of all vertex one by one?
> >
> > When one attempt of a subtask finishes, all other concurrent attempts
> > will be canceled, but still treated as the current executions. The way
> the
> > speculative executions are presented should be almost the same as the
> > job was running. Users can still find the executions folded in the
> subtask
> > list page.
> >
> > As we mentioned in the FLIP, all changes are expected to be transparent
> > to users who don't use speculative execution. And to users who do use
> > speculative execution, the experience should be almost the same
> > when watching a running job or a completed job in the history server.
> >
> > Best,
> > Gen
> >
> > On Tue, Jul 12, 2022 at 8:41 PM Jing Zhang  wrote:
> >
> > > Thanks for driving this discussion. It's a very helpful improvement.
> > > I only have two minor questions:
> > > 1. Does the improvement also cover history server or just Web UI?
> > > 2. How to know whether the job contains speculative execution instances
> > > after the job finished?
> > > Do we have to check each subtasks of all vertex one by one?
> > >
> > > Best,
> > > Jing Zhang
> > >
> > > Gen Luo  于2022年7月11日周一 22:31写道:
> > >
> > > > Hi, everyone.
> > > >
> > > > Thanks for your feedback.
> > > > If there are no more concerns or comments, I will start the vote
> > > tomorrow.
> > > >
> > > > Gen Luo  于 2022年7月11日周一 11:12写道:
> > > >
> > > > > Hi Lijie and Zhu,
> > > > >
> > > > > Thanks for the suggestion. I agree that the name "Blocked Free
> Slots"
> > > is
> > > > > more clear to users.
> > > > > I'll take the suggestion and update the FLIP.
> > > > >
> > > > > On Fri, Jul 8, 2022 at 9:12 PM Zhu Zhu  wrote:
> > > > >
> > > > >> I agree that it can be more useful to show the number of slots
> that
> > > are
> > > > >> free but blocked. Currently users infer the slots in use by
> > > subtracting
> > > > >> available slots from the total slots. With blocked slots
> introduced,
> > > > this
> > > > >> can be achieved by subtracting available slots and blocked free
> > slots
> > > > >> from the total slots.
> > > > >>
> > > > >> Therefore, +1 to show "Blocked Free Slots" on the resource card.
> > > > >>
> > > > >> Thanks,
> > > > >> Zhu
> > > > >>
> > > > >> Lijie Wang  于2022年7月8日周五 17:39写道:
> > > > >> >
> > > > >> > Hi Gen & Zhu,
> > > > >> >
> > > > >> > -> 1. Can we also show "Blocked Slots" in the resource card, so
> > that
> > > > >> users
> > > > >> > can easily figure out how many slots are
> available/blocked/in-use?
> > > > >> >
> > > > >> > I think we should describe the "available" and "blocked" more
> > > clearly.
> > > > >> In
> > > > >> > my opinion, I think users should be interested in the number of
> > > slots
> > > > in
> > > > >> > the following 3 state:
> > > > >> > 1. free and unblocked, I think it's OK to call this state
> > > "available".
> > > > >> > 2. free and blocked, I think it's not appropriate to call
> > "blocked"
> > > > >> > directly, because "blocked" should include both the "free and
> > > blocked"
> > > > >> and
> > > > >> > "in-use and blocked".
> > > > >> > 3. in-use
> > > > >> >
> > > > >> 

[jira] [Created] (FLINK-28548) The commit partition base path is not created when no data is sent which may cause FileNotFoundException

2022-07-13 Thread Liu (Jira)
Liu created FLINK-28548:
---

 Summary: The commit partition base path is not created when no 
data is sent which may cause FileNotFoundException
 Key: FLINK-28548
 URL: https://issues.apache.org/jira/browse/FLINK-28548
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.15.1, 1.14.5, 1.16.0
Reporter: Liu


The commit partition base path is not created when no data is sent which may 
cause FileNotFoundException.  The exception is as following:
{code:java}
Caused by: java.io.FileNotFoundException: File 
/home/ljg/test_sql.db/flink_batch_test/.staging_1657697612169 does not exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:771)
 ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:120)
 ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:828)
 ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:824)
 ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics$1.doCall(FileSystemLinkResolverWithStatistics.java:37)
 ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at org.apache.hadoop.hdfs.perflog.PerfProxy.call(PerfProxy.java:49) 
~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.hdfs.perflog.FileSystemLinkResolverWithStatistics.resolve(FileSystemLinkResolverWithStatistics.java:39)
 ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:835)
 ~[hadoop-hdfs-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) 
~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.fs.FilterFileSystem.listStatus(FilterFileSystem.java:238) 
~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.fs.viewfs.ChRootedFileSystem.listStatus(ChRootedFileSystem.java:241)
 ~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.hadoop.fs.viewfs.ViewFileSystem.listStatus(ViewFileSystem.java:376) 
~[hadoop-common-2.6.0U203-cdh5.4.4.jar:?]
at 
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170)
 ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths(PartitionTempFileManager.java:87)
 ~[flink-connector-files-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitions(FileSystemCommitter.java:78)
 ~[flink-connector-files-1.15.0.jar:1.15.0]
at 
org.apache.flink.connector.file.table.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:89)
 ~[flink-connector-files-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:153)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.jobFinished(DefaultExecutionGraph.java:1190)
 ~[flink-dist-1.15.0.jar:1.15.0]
... 43 more {code}
We should check whether the base path exists before listStatus for the path.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-249: Flink Web UI Enhancement for Speculative Execution

2022-07-13 Thread Yun Gao
+1 (binding)

Thanks for driving this!

Best,
Yun Gao


--
From:Jing Zhang 
Send Time:2022 Jul. 13 (Wed.) 19:07
To:undefined 
Subject:Re: [VOTE] FLIP-249: Flink Web UI Enhancement for Speculative Execution

+1 (binding)

Best,
Jing Zhang

Gen Luo  于2022年7月13日周三 14:49写道:

> Hi Jing,
>
> I have replied in the discussion thread about the questions. Hope that
> would be helpful.
>
> Best,
> Gen
>
> On Tue, Jul 12, 2022 at 8:43 PM Jing Zhang  wrote:
>
> > Hi, Gen Luo,
> >
> > I left  two minor questions in the DISCUSS thread.
> > Sorry for jumping into the discussion so late.
> >
> > Best,
> > Jing Zhang
> >
> > Lijie Wang  于2022年7月12日周二 19:29写道:
> >
> > > +1 (non-binding)
> > >
> > > Best,
> > > Lijie
> > >
> > > Zhu Zhu  于2022年7月12日周二 17:38写道:
> > >
> > > > +1 (binding)
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Gen Luo  于2022年7月12日周二 13:46写道:
> > > > >
> > > > > Hi everyone,
> > > > >
> > > > >
> > > > > Thanks for all the feedback so far. Based on the discussion [1], we
> > > seem
> > > > to
> > > > > have consensus. So, I would like to start a vote on FLIP-249 [2].
> > > > >
> > > > >
> > > > > The vote will last for at least 72 hours unless there is an
> objection
> > > or
> > > > > insufficient votes.
> > > > >
> > > > >
> > > > > [1]
> https://lists.apache.org/thread/832tk3zvysg45vrqrv5tgbdqx974pm3m
> > > > > [2]
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-249%3A+Flink+Web+UI+Enhancement+for+Speculative+Execution
> > > >
> > >
> >
>


Re: [REQUEST] Edit Permissions for FLIP

2022-07-13 Thread Steve Yurong Su
Hi Alexander,

The reference [3] mentioned in my previous email is your discussion
thread on FLIP-238. We have noticed your idea of providing a new
rate-limiter, which is great and matches our starting point. I was
just about to chat with you on slack about this feature :)

I tend to propose a separate FLIP for the following reasons:
1. Source rate limiting is an independent feature and involves changes
to the Source API.
2. Source rate limiting is a much more important feature. Projects
such as Flink CDC can directly benefit from this feature, and a
separate FLIP will help its implementation and application.
3. The function of the source rate-limiter may go further, it can
provide not only limits on throughput but also limits on the number of
requests and so on (perhaps your implementation is enough for now),
which can better serve different types of sources.

What do you think? Is it necessary to raise a FLIP separately? If
necessary, you can also write the first version of the proposal for
source rate-limiting, and then I will add some of my ideas :)

Thanks,
Steve

Alexander Fedulov  于2022年7月14日周四 00:47写道:
>
> Hi Steve,
>
> in the FLIP-238 [1] I propose to also introduce a rate limiter with a
> simple API at the level of SourceReader:
> RateLimiter.java
> 
> GuavaRateLimiter.java
> 
> RateLimitedSourceReader.java
> 
> DataGeneratorSourceV3.java#L140
> 
>
> Did you plan to go further than this?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-238%3A+Introduce+FLIP-27-based+Data+Generator+Source
>
> Best,
> Alexander Fedulov
>
> On Wed, Jul 13, 2022 at 6:38 PM Steve Yurong Su  wrote:
>
> > Hi Jark,
> >
> > Thanks a lot :)
> >
> > Steve
> >
> > Jark Wu  于2022年7月13日周三 22:48写道:
> > >
> > > Hi Steve,
> > >
> > > Welcome to the Flink community. I have granted the edit permission to
> > you.
> > > Looking forward to your FLIP.
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 13 Jul 2022 at 19:56, Steve Yurong Su  wrote:
> > >
> > > > Hi folks!
> > > >
> > > > My name is Steve Su, and I am currently a third-year master’s student
> > > > at Tsinghua University.
> > > >
> > > > I am now participating in a Flink-related student project in OSPP
> > > > (Open Source Promotion Plan[1]). The project aims to design and
> > > > implement the generic rate limiter for the FLIP-27 source and apply
> > > > the rate-limiting feature to the Flink CDC project. @Leonard Xu is my
> > > > mentor in this project.
> > > >
> > > > We already had many discussions on source rate-limiting on the mailing
> > > > list, such as [2], [3], and so on. Leonard and I had an offline
> > > > discussion, and we thought that it would be great to have a generic
> > > > rate limiter, so I’d like to raise a new FLIP to illustrate the design
> > > > further.
> > > >
> > > > Please grant my account FLIP edit permissions. My apache confluence
> > > > account id is `steveyurongsu`. :D
> > > >
> > > > Thanks,
> > > > Steve Su
> > > > ---
> > > > [1] https://summer-ospp.ac.cn/homepagy
> > > > [2]https://lists.apache.org/thread/ff6mcos8g4otnhjrp030lrcrf3omgfmf
> > > > [3] https://lists.apache.org/thread/7gjxto1rmkpff4kl54j8nlg5db2rqhkt
> > > >
> >