Re: [DISCUSS] Support source/sink parallelism config in Flink sql

2020-09-20 Thread Benchao Li
Hi admin,

Thanks for bringing up this discussion.
IMHO, it's a valuable feature. We also added this feature for our internal
SQL engine.
And our way is very similar to your proposal.

Regarding the implementation, there is one shorthand that we should modify
each connector
to support this property.
We can wait for others' opinion whether this is a valid proposal. If yes,
then we can discuss
the implementation detailedly.

admin <17626017...@163.com> 于2020年9月10日周四 上午1:19写道:

> Hi devs:
> Currently,Flink sql does not support source/sink parallelism config.So,it
> will result in wasting or lacking resources in some cases.
> I think it is necessary to introduce configuration of source/sink
> parallelism in sql.
> From my side,i have the solution for this feature.Add parallelism config
> in ‘with’ properties of DDL.
>
> Before 1.11,we can get parallelism and then set it to
> StreamTableSink#consumeDataStream or StreamTableSource#getDataStream
> After 1.11,we can get parallelism from catalogTable and then set it to
> transformation in CommonPhysicalTableSourceScan or CommonPhysicalSink.
>
> What do you think?
>
>
>
>
>

-- 

Best,
Benchao Li


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Guowei Ma
Hi, Stevn
I want to make a clarification first, the following reply only considers
the Iceberge sink, but does not consider other sinks.  Before make decision
we should consider all the sink.I would try to summary all the sink
requirments in the next mail


>>  run global committer in jobmanager (e.g. like sink coordinator)

I think it could be.


>> You meant GlobalCommit -> GlobalCommT, right?

Yes. Thanks :)


>> Is this called when restored from checkpoint/savepoint?

Yes.


>>Iceberg sink needs to do a dup check here on which GlobalCommT were
committed and which weren't. Should it return the filtered/de-duped list of
GlobalCommT?


I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.


>> Sink implementation can decide if it wants to commit immediately or just
leave

I think only the frame knows *when* call the commit function.


>>should this be "commit(List)"?

It could be. thanks.


Best,
Guowei


On Sun, Sep 20, 2020 at 12:11 AM Steven Wu  wrote:

> > I prefer to let the developer produce id to dedupe. I think this gives
> the developer more opportunity to optimize.
>
> Thinking about it again, I totally agree with Guowei on this. We don't
> really need the framework to generate the unique id for Iceberg sink.
> De-dup logic is totally internal to Iceberg sink and should be isolated
> inside. My earlier question regarding "commitGlobally(List)
> can be concurrent or not" also becomes irrelevant, as long as the framework
> handles the GlobalCommT list properly (even with concurrent calls).
>
> Here are the things where framework can help
>
>1. run global committer in jobmanager (e.g. like sink coordinator)
>2. help with checkpointing, bookkeeping, commit failure handling,
>recovery
>
>
> @Guowei Ma  regarding the GlobalCommitter
> interface, I have some clarifying questions.
>
> > void recoveredGlobalCommittables(List globalCommits)
>
>1. You meant GlobalCommit -> GlobalCommT, right?
>2. Is this called when restored from checkpoint/savepoint?
>3.  Iceberg sink needs to do a dup check here on which GlobalCommT
>were committed and which weren't. Should it return the filtered/de-duped
>list of GlobalCommT?
>4. Sink implementation can decide if it wants to commit immediately or
>just leave
>
> > void commit(GlobalCommit globalCommit);
>
> should this be "commit(List)"?
>
> Thanks,
> Steven
>
>
> On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma  wrote:
>
>> Hi, all
>>
>> >>Just to add to what Aljoscha said regarding the unique id. Iceberg sink
>> >>checkpoints the unique id into state during snapshot. It also inserts
>> the
>> >>unique id into the Iceberg snapshot metadata during commit. When a job
>> >>restores the state after failure, it needs to know if the restored
>> >>transactions/commits were successful or not. It basically iterates
>> through
>> >>the list of table snapshots from Iceberg and matches the unique ids with
>> >>what is stored in Iceberg snapshot metadata.
>>
>> Thanks Steven for these detailed explanations. It makes me know the
>> IceBerg
>> better. However, I prefer to let the developer produce id to dedupe. I
>> think this gives the developer more opportunity to optimize. You could see
>> the following for more details. Please correct me if I misunderstand you.
>>
>> >> 3. Whether the `Writer` supports async functionality or not. Currently
>> I
>> do
>> >> not know which sink could benefit from it. Maybe it is just my own
>> problem.
>>
>> >> Here, I don't really know. We can introduce an "isAvailable()" method
>> >> and mostly ignore it for now and sinks can just always return true. Or,
>> >> as an alternative, we don't add the method now but can add it later
>> with
>> >> a default implementation. Either way, we will probably not take
>> >> advantage of the "isAvailable()" now because that would require more
>> >> runtime changes.
>>
>> From the @Pitor's explanation I could see the other benefit that might be
>> gained in the future. For example decoupling the task number and the
>> thread
>> number. But I have to admit that introducing `isAvailable` might introduce
>> some complications in the runtime. You could see my alternative API option
>> in the following. I believe that we could support such an async sink
>> writer
>> very easily in the future. What do you think?
>>
>> >> Yes, this is still tricky. What is the current state, would the
>> >> introduction of a "LocalCommit" and a "GlobalCommit" already solve both
>> >> the Iceberg and Hive cases? I believe Hive is the most tricky one here,
>> >> but if we introduce the "combine" method on GlobalCommit, that could
>> >> serve the same purpose as the "aggregation operation" on the individual
>> >> files, and we could even execute that "combine" in a distributed way.
>> >>We assume that GlobalCommit is a Agg/Combiner?
>>
>> I would share what possible problems that I am seeing currently and the
>> alternative options.

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Guowei Ma
I would like to summarize the file type sink in the thread and their
possible topologies.  I also try to give pros and cons of every topology
option. Correct me if I am wrong.

### FileSink

Topology Option: TmpFileWriter + Committer.

### IceBerg Sink

 Topology Option1: `DataFileWriter` + `GlobalCommitterV0`.
Pro:
1. Same group has some id.
Cons:
1. May limit users’ optimization space;
2. The topology does not meet the Hive’s requirements.

 Topology Option 2: `DataFileWriter` + `GlobalCommitterV1`
Pro:
1. User has the opportunity to optimize the implementation of idempotence
Cons:
2. Make the GlobalCommit more complicated.
3. The topology does not meets the Hive’s requirements

### Topology Option3: DataFileWriter + AggWriter + Committer

Pros:
1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s requirements.
2. Opportunity to optimize the implementation of idempotence
3. The topology meets the Hive’s requirements.(See flowing)
Con:
1. It introduce a relative complex topologies

## HiveSink

### Topology Option1: `TmpFileWriter` + `Committer` + `GlobalCommitterV2`.
Pro:
1. Could skip the cleanup problem at first.
Con:
1. This style topology does not meet the CompactHiveSink requirements.

### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` +
`Committer`
Pros
1. Could skip the clean up problem at first.
2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
Cons
1. This style topology does not meet the CompactHiveSink requirements.
2. There are two general `Committers` in the topology. For Hive’s case
there might be no problem. But there might be a problem in 1.12. For
example where to execute the sub-topology following the `Committer` in
batch execution mode for the general case. Because the topology is built
from `Writer` and `Committer` we might put all the sub-topology in the
OperatorCoordinator. But if the topology is too complicated it might be
very complicated. See following.

### Topology Option3 `FileWriter` + `AggWriter` + `Committer`
Pro
1. There is only one general committer.
Cons
1. It has to consider the cleanup problem. (In theory both the Option1 and
Option2 need to cleanup)
2. This style topology does not meet the CompactHiveSink requirements.
3. Have to figure out how to make the current version compatible.

### CompactHiveSink/MergeHiveSink

 Topology Option1 `TmpFileWriter` + `Committer` + `MergerCoordinator` +
`MergeWriter` + `GlobalCommiterV2`
Pro
1. Could skip the clean up problem at first.
Cons
2. Where to execute the sub-topology following the `Committer`.

 Topology Option2 `TmpFileWriter` + `Committer` + `MergerCoordinator` +
`MergeWriter` + AggWriter + Committer
Pros
1. Could skip the clean up problem at first
2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
Con
1. Where to execute the sub-topology following the `Committer`.

### Option3 FileWriter + MergeCoordinator + MergeFileWriter + Writer(Agg) +
Committer
Pro
1. There is only one committer. It is very easy to support in the batch
execution mode.
Con
2. It has to consider the cleanup problem. (In theory both the Option1 and
Option2 need to cleanup)


### Summary

>From above we could divide the sink topology into two parts:
1. Write topology.
2. And One committer

So we could provide a unified sink API looks like the following:

public interface Sink {
List> getWriters();
Committer createCommitter()
}

In the long run maybe we could give the user more powerful ability like
this (Currently some transformation still belongs to runtime):
Sink {
Transformation createWriteTopology();
 CommitFunction createCommitter();
}

Best,
Guowei


On Sun, Sep 20, 2020 at 6:09 PM Guowei Ma  wrote:

> Hi, Stevn
> I want to make a clarification first, the following reply only considers
> the Iceberge sink, but does not consider other sinks.  Before make decision
> we should consider all the sink.I would try to summary all the sink
> requirments in the next mail
>
>
> >>  run global committer in jobmanager (e.g. like sink coordinator)
>
> I think it could be.
>
>
> >> You meant GlobalCommit -> GlobalCommT, right?
>
> Yes. Thanks :)
>
>
> >> Is this called when restored from checkpoint/savepoint?
>
> Yes.
>
>
> >>Iceberg sink needs to do a dup check here on which GlobalCommT were
> committed and which weren't. Should it return the filtered/de-duped list of
> GlobalCommT?
>
>
> I think Iceberg sink needs to do the dedup in the `commit` call. The
> `recoveredGlobalCommittables` is just for restoring the ids.
>
>
> >> Sink implementation can decide if it wants to commit immediately or
> just leave
>
> I think only the frame knows *when* call the commit function.
>
>
> >>should this be "commit(List)"?
>
> It could be. thanks.
>
>
> Best,
> Guowei
>
>
> On Sun, Sep 20, 2020 at 12:11 AM Steven Wu  wrote:
>
>> > I prefer to let the developer produce id to dedupe. I think this gives
>> the developer more opportunity to optimize.
>>
>> Thinking about it again, 

[jira] [Created] (FLINK-19295) YARNSessionFIFOITCase.checkForProhibitedLogContents found a log with prohibited string

2020-09-20 Thread Dian Fu (Jira)
Dian Fu created FLINK-19295:
---

 Summary: YARNSessionFIFOITCase.checkForProhibitedLogContents found 
a log with prohibited string
 Key: FLINK-19295
 URL: https://issues.apache.org/jira/browse/FLINK-19295
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.12.0
Reporter: Dian Fu


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6661&view=logs&j=245e1f2e-ba5b-5570-d689-25ae21e5302f&t=e7f339b2-a7c3-57d9-00af-3712d4b15354]

{code}
2020-09-19T22:08:13.5364974Z [ERROR]   
YARNSessionFIFOITCase.checkForProhibitedLogContents:83->YarnTestBase.ensureNoProhibitedStringInLogFiles:476
 Found a file 
/__w/2/s/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1600553154281_0001/container_1600553154281_0001_01_02/taskmanager.log
 with a prohibited string (one of [Exception, Started 
SelectChannelConnector@0.0.0.0:8081]). Excerpts:
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: [DISCUSS] Support source/sink parallelism config in Flink sql

2020-09-20 Thread 刘大龙

+1

> -原始邮件-
> 发件人: "Benchao Li" 
> 发送时间: 2020-09-20 16:28:20 (星期日)
> 收件人: dev 
> 抄送: 
> 主题: Re: [DISCUSS] Support source/sink parallelism config in Flink sql
> 
> Hi admin,
> 
> Thanks for bringing up this discussion.
> IMHO, it's a valuable feature. We also added this feature for our internal
> SQL engine.
> And our way is very similar to your proposal.
> 
> Regarding the implementation, there is one shorthand that we should modify
> each connector
> to support this property.
> We can wait for others' opinion whether this is a valid proposal. If yes,
> then we can discuss
> the implementation detailedly.
> 
> admin <17626017...@163.com> 于2020年9月10日周四 上午1:19写道:
> 
> > Hi devs:
> > Currently,Flink sql does not support source/sink parallelism config.So,it
> > will result in wasting or lacking resources in some cases.
> > I think it is necessary to introduce configuration of source/sink
> > parallelism in sql.
> > From my side,i have the solution for this feature.Add parallelism config
> > in ‘with’ properties of DDL.
> >
> > Before 1.11,we can get parallelism and then set it to
> > StreamTableSink#consumeDataStream or StreamTableSource#getDataStream
> > After 1.11,we can get parallelism from catalogTable and then set it to
> > transformation in CommonPhysicalTableSourceScan or CommonPhysicalSink.
> >
> > What do you think?
> >
> >
> >
> >
> >
> 
> -- 
> 
> Best,
> Benchao Li


[jira] [Created] (FLINK-19296) RetryingCallback is not aware of task cancaltion

2020-09-20 Thread Igal Shilman (Jira)
Igal Shilman created FLINK-19296:


 Summary: RetryingCallback is not aware of task cancaltion 
 Key: FLINK-19296
 URL: https://issues.apache.org/jira/browse/FLINK-19296
 Project: Flink
  Issue Type: Bug
  Components: Stateful Functions
Reporter: Igal Shilman


Currently RetryingCallback would be retrying until the maximum timeout elapses, 
unaware of rather or not the task was canceled.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Steven Wu
> I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.


@Guowei Ma   It is undesirable to do the dedup check
in the `commit` call, because it happens for each checkpoint cycle. We only
need to do the de-dup check one time when restoring GlobalCommT list from
the checkpoint.


Can you clarify the purpose of `recoveredGlobalCommittables`? If it is to
let sink implementations know the recovered GlobalCommT list, it is
probably not a sufficient API. For the Iceberg sink, we can try to
implement the de-dup check  inside the `recoveredGlobalCommittables` method
and commit any uncommitted GlobalCommT items. But how do we handle the
commit failed?


One alternative is to allow sink implementations to override "Li
st recoverGlobalCommittables()". Framework handles the
checkpoint/state, and sink implementations can further customize the
restored list with de-dup check and filtering. Recovered uncommitted
GlobalCommT list will be committed in the next cycle. It is the same
rollover strategy for commit failure handling that we have been discussing.


## topologies


Regarding the topology options, if we agree that there is no one size fit
for all, we can let sink implementations choose the best topology. Maybe
the framework can provide 2-3 pre-defined topology implementations to help
the sinks.




On Sun, Sep 20, 2020 at 3:27 AM Guowei Ma  wrote:

> I would like to summarize the file type sink in the thread and their
> possible topologies.  I also try to give pros and cons of every topology
> option. Correct me if I am wrong.
>
> ### FileSink
>
> Topology Option: TmpFileWriter + Committer.
>
> ### IceBerg Sink
>
>  Topology Option1: `DataFileWriter` + `GlobalCommitterV0`.
> Pro:
> 1. Same group has some id.
> Cons:
> 1. May limit users’ optimization space;
> 2. The topology does not meet the Hive’s requirements.
>
>  Topology Option 2: `DataFileWriter` + `GlobalCommitterV1`
> Pro:
> 1. User has the opportunity to optimize the implementation of idempotence
> Cons:
> 2. Make the GlobalCommit more complicated.
> 3. The topology does not meets the Hive’s requirements
>
> ### Topology Option3: DataFileWriter + AggWriter + Committer
>
> Pros:
> 1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s requirements.
> 2. Opportunity to optimize the implementation of idempotence
> 3. The topology meets the Hive’s requirements.(See flowing)
> Con:
> 1. It introduce a relative complex topologies
>
> ## HiveSink
>
> ### Topology Option1: `TmpFileWriter` + `Committer` + `GlobalCommitterV2`.
> Pro:
> 1. Could skip the cleanup problem at first.
> Con:
> 1. This style topology does not meet the CompactHiveSink requirements.
>
> ### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` +
> `Committer`
> Pros
> 1. Could skip the clean up problem at first.
> 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> Cons
> 1. This style topology does not meet the CompactHiveSink requirements.
> 2. There are two general `Committers` in the topology. For Hive’s case
> there might be no problem. But there might be a problem in 1.12. For
> example where to execute the sub-topology following the `Committer` in
> batch execution mode for the general case. Because the topology is built
> from `Writer` and `Committer` we might put all the sub-topology in the
> OperatorCoordinator. But if the topology is too complicated it might be
> very complicated. See following.
>
> ### Topology Option3 `FileWriter` + `AggWriter` + `Committer`
> Pro
> 1. There is only one general committer.
> Cons
> 1. It has to consider the cleanup problem. (In theory both the Option1 and
> Option2 need to cleanup)
> 2. This style topology does not meet the CompactHiveSink requirements.
> 3. Have to figure out how to make the current version compatible.
>
> ### CompactHiveSink/MergeHiveSink
>
>  Topology Option1 `TmpFileWriter` + `Committer` + `MergerCoordinator`
> + `MergeWriter` + `GlobalCommiterV2`
> Pro
> 1. Could skip the clean up problem at first.
> Cons
> 2. Where to execute the sub-topology following the `Committer`.
>
>  Topology Option2 `TmpFileWriter` + `Committer` + `MergerCoordinator`
> + `MergeWriter` + AggWriter + Committer
> Pros
> 1. Could skip the clean up problem at first
> 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> Con
> 1. Where to execute the sub-topology following the `Committer`.
>
> ### Option3 FileWriter + MergeCoordinator + MergeFileWriter + Writer(Agg)
> + Committer
> Pro
> 1. There is only one committer. It is very easy to support in the batch
> execution mode.
> Con
> 2. It has to consider the cleanup problem. (In theory both the Option1 and
> Option2 need to cleanup)
>
>
> ### Summary
>
> From above we could divide the sink topology into two parts:
> 1. Write topology.
> 2. And One committer
>
> So we could provide a unified sink API looks like the following:
>
> public interface Sink {
> List> getWrit

Re: Re: [ANNOUNCE] New Apache Flink Committer - Godfrey He

2020-09-20 Thread godfrey he
Thanks everyone for the warm reception!

Best,
Godfrey

Rui Li  于2020年9月18日周五 下午6:21写道:

> Congrats Godfrey! Well deserved!
>
> On Fri, Sep 18, 2020 at 5:12 PM Yun Gao 
> wrote:
>
>> Congratulations Godfrey!
>>
>> Best,
>> Yun
>>
>>
>>
>>  --Original Mail --
>> Sender:Dawid Wysakowicz 
>> Send Date:Thu Sep 17 14:45:55 2020
>> Recipients:Flink Dev , 贺小令 
>> Subject:Re: [ANNOUNCE] New Apache Flink Committer - Godfrey He
>> Congratulations!
>>
>> On 16/09/2020 06:19, Jark Wu wrote:
>> > Hi everyone,
>> >
>> > It's great seeing many new Flink committers recently, and on behalf of
>> the
>> > PMC,
>> > I'd like to announce one more new committer: Godfrey He.
>> >
>> > Godfrey is a very long time contributor in the Flink community since the
>> > end of 2016.
>> > He has been a very active contributor in the Flink SQL component with
>> 153
>> > PRs and more than 571,414 lines which is quite outstanding.
>> > Godfrey has paid essential effort with SQL optimization and helped a lot
>> > during the blink merging.
>> > Besides that, he is also quite active with community work especially in
>> > Chinese mailing list.
>> >
>> > Please join me in congratulating Godfrey for becoming a Flink committer!
>> >
>> > Cheers,
>> > Jark Wu
>> >
>>
>>
>
> --
> Best regards!
> Rui Li
>


Re: Re: [DISCUSS] Support source/sink parallelism config in Flink sql

2020-09-20 Thread Jark Wu
Since FLIP-95, the parallelism is decoupled from the runtime class
(DataStream/SourceFunction),
so we need to have an API to tell the planner what the parallelism of the
source/sink is.

This is indeed the purpose of a previous discussion: [DISCUSS] Introduce
SupportsParallelismReport and SupportsStatisticsReport
We can continue the discussion there.

Best,
Jark

[1]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-SupportsParallelismReport-and-SupportsStatisticsReport-for-Hive-and-Filesystem-td43531.html

On Sun, 20 Sep 2020 at 23:14, 刘大龙  wrote:

>
> +1
>
> > -原始邮件-
> > 发件人: "Benchao Li" 
> > 发送时间: 2020-09-20 16:28:20 (星期日)
> > 收件人: dev 
> > 抄送:
> > 主题: Re: [DISCUSS] Support source/sink parallelism config in Flink sql
> >
> > Hi admin,
> >
> > Thanks for bringing up this discussion.
> > IMHO, it's a valuable feature. We also added this feature for our
> internal
> > SQL engine.
> > And our way is very similar to your proposal.
> >
> > Regarding the implementation, there is one shorthand that we should
> modify
> > each connector
> > to support this property.
> > We can wait for others' opinion whether this is a valid proposal. If yes,
> > then we can discuss
> > the implementation detailedly.
> >
> > admin <17626017...@163.com> 于2020年9月10日周四 上午1:19写道:
> >
> > > Hi devs:
> > > Currently,Flink sql does not support source/sink parallelism
> config.So,it
> > > will result in wasting or lacking resources in some cases.
> > > I think it is necessary to introduce configuration of source/sink
> > > parallelism in sql.
> > > From my side,i have the solution for this feature.Add parallelism
> config
> > > in ‘with’ properties of DDL.
> > >
> > > Before 1.11,we can get parallelism and then set it to
> > > StreamTableSink#consumeDataStream or StreamTableSource#getDataStream
> > > After 1.11,we can get parallelism from catalogTable and then set it to
> > > transformation in CommonPhysicalTableSourceScan or CommonPhysicalSink.
> > >
> > > What do you think?
> > >
> > >
> > >
> > >
> > >
> >
> > --
> >
> > Best,
> > Benchao Li
>


Re: Timed out patterns handling using MATCH_RECOGNIZE

2020-09-20 Thread Jark Wu
Hi Kosma,

Thanks for the proposal. I like it and we also have supported similar
syntax in our company.
The problem is that Flink SQL leverages Calcite as the query parser, so if
we want to support this syntax, we may have to push this syntax back to the
Calcite community.
Besides, the SQL standard doesn't define the timeout syntax for MATCH
RECOGNIZE. So we have to extend the standard and this is usually not
trivial.

So I think it would be better to have a joint discussion with the Calcite
and Flink community together. What do you think?

Best,
Jark





On Fri, 18 Sep 2020 at 22:48, Kosma Grochowski <
kosma.grochow...@getindata.com> wrote:

> Hello,
>
> I would like to propose an enrichment of existing Flink SQL
> MATCH_RECOGNIZE syntax to cover for the case of the absence of an event.
> Such an enrichment would help our company solve a business case containing
> timed-out patterns handling. An example of usage of such a clause from
> Flink training exercises could be a task of identification of taxi rides
> with a START event that is not followed by an END event within two hours.
> Currently, a solution to such a task could be achieved with the use of CEP
> and a timeout handler. However, as far as I know, it is impossible to take
> advantage of Flink SQL syntax for this task.
>
> I can think of two ways for such a feature to be incorporated into
> existing MATCH_RECOGNIZE syntax:
> - In analogy to CEP, a keyword could be added which would determine, if
> timed out matches should be dropped altogether or available either through
> side output or main output. SQL usage could be similar to the current
> WITHIN clause, f.e. "PATTERN (A B C) TIMEOUT INTERVAL '30' SECOND" would
> output partially matched patterns 30 seconds after A event appearance.
>
> - Add possibility to define absence of event inside pattern definition -
> for example "PATTERN (A B !C) WITHIN INTERVAL '30' SECOND" would output
> partially matched patterns with the occurrence of A and B event 30 seconds
> after A event appearance.
>
> In our company we did some basic testing of this concept - we modified
> existing MatchCodeGenerator to add processTimedOutMatch function based on a
> boolean trigger and tested it against the aforementioned business case
> containing timed-out patterns handling.
>
>
> I'm interested to hear your thoughts about how we could help Flink SQL be
> able to express these kinds of cases.
>
> With regards,
> Kosma Grochowski
>
>
>
>


Re: [DISCUSS] Forwarding Kafka's metrics groups

2020-09-20 Thread Becket Qin
I agree that we can remove the double registered metrics. Maybe we can make
this part of the effort to change the connectors to conform to FLIP-33.

What do you think?

Thanks,

Jiangjie (Becket) Qin



On Wed, Sep 16, 2020 at 3:58 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> When answering a user question in ML[1] I spotted that when we forward
> Kafka's metrics we use only the name of the metric[2]. I was wondering
> if we should use both the metric name and group? I think it would make
> it easier to track back a particular metric.
>
> Moreover we still register Kafka's metrics both in the dedicated group
> "KafkaConsumer", as well as in the subtasks group.[3] Can we remove it now?
>
> Best,
>
> Dawid
>
> [1]
>
> https://lists.apache.org/thread.html/ra1ec6c227b7c03df313cab523b6bdf511242751de8bce7f68efb2a52%40%3Cuser.flink.apache.org%3E
>
> [2]
>
> https://github.com/apache/flink/blob/b43075b710b27830830b8d2074454c69659392f5/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L185
>
> [3]
>
> https://github.com/apache/flink/blob/b43075b710b27830830b8d2074454c69659392f5/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L188
>
>
>


[VOTE] FLIP-33: Standardize connector metrics

2020-09-20 Thread Becket Qin
Hi all,

I would like to start the voting thread for FLIP-33 which proposes to
standardize the metrics of Flink connectors.

In short, we would like to provide a convention and guidance of Flink
connector metrics. It will help simplify the monitoring and alerting on
Flink jobs. The FLIP link is following:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-33
%3A+Standardize+Connector+Metrics

The vote will be open for at least 72 hours.

Thanks,

Jiangjie (Becket) Qin


[jira] [Created] (FLINK-19297) Make ResultPartitionWriter record-oriented

2020-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19297:
---

 Summary: Make ResultPartitionWriter record-oriented
 Key: FLINK-19297
 URL: https://issues.apache.org/jira/browse/FLINK-19297
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, ResultPartitionWriter is buffer-oriented, that is, RecordWriter will 
add buffer of different channels to ResultPartitionWriter and the buffer 
boundary serves as a nature boundary of data belonging to different channels. 
However, this abstraction is not flexible enough to handle some cases where 
records are appended a joint-structure shared by all channels and sorting is 
used to cluster data belonging to different channels.

In this ticket, we propose to make ResultPartitionWriter record oriented which 
offers more flexibility to the implementation of ResultPartitionWriter. And 
based on the new record-oriented Interface, we will introduce the sort-merge 
based blocking shuffle to Flink in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19298) Maven enforce goal dependency-convergence failed on flink-json

2020-09-20 Thread Jark Wu (Jira)
Jark Wu created FLINK-19298:
---

 Summary: Maven enforce goal dependency-convergence failed on 
flink-json
 Key: FLINK-19298
 URL: https://issues.apache.org/jira/browse/FLINK-19298
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.12.0
Reporter: Jark Wu


See more 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=6669&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=9b1a0f88-517b-5893-fc93-76f4670982b4



{code}

+-org.codehaus.janino:janino:3.0.9
  +-org.codehaus.janino:commons-compiler:3.0.9
and
+-org.apache.flink:flink-json:1.12-SNAPSHOT
  +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
+-org.apache.calcite:calcite-core:1.22.0
  +-org.codehaus.janino:commons-compiler:3.0.11
and
+-org.apache.flink:flink-json:1.12-SNAPSHOT
  +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
+-org.codehaus.janino:commons-compiler:3.0.9

17:08:16.111 [WARNING] 
Dependency convergence error for org.codehaus.janino:janino:3.0.9 paths to 
dependency are:
+-org.apache.flink:flink-json:1.12-SNAPSHOT
  +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
+-org.codehaus.janino:janino:3.0.9
and
+-org.apache.flink:flink-json:1.12-SNAPSHOT
  +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
+-org.apache.flink:flink-table-runtime-blink_2.11:1.12-SNAPSHOT
  +-org.codehaus.janino:janino:3.0.9
and
+-org.apache.flink:flink-json:1.12-SNAPSHOT
  +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
+-org.apache.calcite:calcite-core:1.22.0
  +-org.codehaus.janino:janino:3.0.11
and
+-org.apache.flink:flink-json:1.12-SNAPSHOT
  +-org.apache.flink:flink-table-planner-blink_2.11:1.12-SNAPSHOT
+-org.codehaus.janino:janino:3.0.9

17:08:16.112 [WARNING] Rule 0: 
org.apache.maven.plugins.enforcer.DependencyConvergence failed with message:
Failed while enforcing releasability. See above detailed error message.

{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[RESULT][VOTE] FLIP-36 - Support Interactive Programming in Flink Table API

2020-09-20 Thread Xuannan Su
Hi all,

The voting time for FLIP-36 has passed. I'm closing the vote now.

There were 3 binding votes:
- Aljoscha (binding)
- Timo (binding)
- Becket (binding)

There were no disapproving votes.

Thus, FLIP-36 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

Best,
Xuannan


[jira] [Created] (FLINK-19299) NettyShuffleEnvironmentBuilder#setBufferSize does not take effect

2020-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19299:
---

 Summary: NettyShuffleEnvironmentBuilder#setBufferSize does not 
take effect
 Key: FLINK-19299
 URL: https://issues.apache.org/jira/browse/FLINK-19299
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, NettyShuffleEnvironmentBuilder#setBufferSize does not take effect 
because the set value is never used when building the NettyShuffleEnvironment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19300) Timer loss after restoring from savepoint

2020-09-20 Thread Xiang Gao (Jira)
Xiang Gao created FLINK-19300:
-

 Summary: Timer loss after restoring from savepoint
 Key: FLINK-19300
 URL: https://issues.apache.org/jira/browse/FLINK-19300
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Xiang Gao


While using heap-based timers, we are seeing occasional timer loss after 
restoring program from savepoint, especially when using a remote savepoint 
storage (s3). 

After some investigation, the issue seems to be related to [this line in 
deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
 When try checking the VERSIONED_IDENTIFIER, the input stream may not guarantee 
filling the byte array, causing timers to be dropped for the affected key group.

Should consider reading until expected number of bytes are read or if end of 
the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19301) Improve the package structure of Python DataStream API

2020-09-20 Thread Dian Fu (Jira)
Dian Fu created FLINK-19301:
---

 Summary: Improve the package structure of Python DataStream API 
 Key: FLINK-19301
 URL: https://issues.apache.org/jira/browse/FLINK-19301
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.12.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.12.0


Currently, the classes added for Python DataStream is located in 
*org.apache.flink.datastream* and there is already a package named 
*org.apache.flink.streaming**,* it would be great to move the these classes to 
package *org.apache.flink.streaming* to be consistent with the naming 
conversion of flink-streaming-java. Besides, the class name could also be 
optimized, e.g. DataStreamPythonReduceFunctionOperator -> PythonReduceOperator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS][Release 1.12] Stale blockers and build instabilities

2020-09-20 Thread Robert Metzger
Hi all,

An update on the release status:
1. We have 35 days = *5 weeks left until feature freeze*
2. There are currently 2 blockers for Flink
, all
making progress
3. We have 72 test instabilities
 (down 7 from 2 weeks
ago). I have pinged people to help addressing frequent or critical issues.

Best,
Robert


On Mon, Sep 7, 2020 at 10:37 AM Robert Metzger  wrote:

> Hi all,
>
> another two weeks have passed. We now have 5 blockers
>  (Up
> 3 from 2 weeks ago), but they are all making progress.
>
> We currently have 79 test-instabilities
> ,
> since the last report, a few have been resolved, and some others have been
> added.
> I have checked the tickets, closed some old ones and pinged people to help
> resolve new or frequent ones.
> Except for Kafka, there are no major clusters of test instabilities. Most
> failures are rarely failing tests across the entire system.
>
>
> On Tue, Aug 25, 2020 at 9:05 AM Rui Li  wrote:
>
>> Thanks Dian for the pointer. I'll take a look.
>>
>> On Tue, Aug 25, 2020 at 3:02 PM Dian Fu  wrote:
>>
>> > Thanks Rui for the info. This issue(hive related)
>> > https://issues.apache.org/jira/browse/FLINK-19025 <
>> > https://issues.apache.org/jira/browse/FLINK-19025> is marked as a
>> blocker.
>> >
>> > Regards,
>> > Dian
>> >
>> > > 在 2020年8月25日,下午2:58,Rui Li  写道:
>> > >
>> > > Hi Dian,
>> > >
>> > > FLINK-18682 has been fixed. Is there any other blocker in the hive
>> > > connector?
>> > >
>> > > On Tue, Aug 25, 2020 at 2:41 PM Dian Fu > > > dian0511...@gmail.com>> wrote:
>> > >
>> > >> Hi all,
>> > >>
>> > >> Two weeks have passed and it seems that none of the test stabilities
>> > >> issues have been addressed since then.
>> > >>
>> > >> Here is an updated status report of Blockers and Test instabilities:
>> > >>
>> > >> Blockers <
>> > >> https://issues.apache.org/jira/browse/FLINK-18682?filter=12349334 <
>> > https://issues.apache.org/jira/browse/FLINK-18682?filter=12349334> <
>> > >> https://issues.apache.org/jira/browse/FLINK-18682?filter=12349334 <
>> > https://issues.apache.org/jira/browse/FLINK-18682?filter=12349334>>>:
>> > >> Currently 2 blockers (1x Hive, 1x CI Infra)
>> > >>
>> > >> Test-Instabilities <
>> > >> https://issues.apache.org/jira/browse/FLINK-18869?filter=12348580 <
>> > https://issues.apache.org/jira/browse/FLINK-18869?filter=12348580> <
>> > >> https://issues.apache.org/jira/browse/FLINK-18869?filter=12348580 <
>> > https://issues.apache.org/jira/browse/FLINK-18869?filter=12348580>>>:
>> > >> (total 80)
>> > >>
>> > >> Besides the issues already posted in previous mail, here are the new
>> > >> instability issues which should be taken care of:
>> > >>
>> > >> - FLINK-19012 (https://issues.apache.org/jira/browse/FLINK-19012 <
>> > https://issues.apache.org/jira/browse/FLINK-19012> <
>> > >> https://issues.apache.org/jira/browse/FLINK-19012 <
>> > https://issues.apache.org/jira/browse/FLINK-19012>>)
>> > >> E2E test fails with "Cannot register Closeable, this
>> > >> subtaskCheckpointCoordinator is already closed. Closing argument."
>> > >>
>> > >> -> This is a new issue occurred recently. It has occurred several
>> times
>> > >> and may indicate a bug somewhere and should be taken care of.
>> > >>
>> > >> - FLINK-9992 (https://issues.apache.org/jira/browse/FLINK-9992 <
>> > https://issues.apache.org/jira/browse/FLINK-9992> <
>> > >> https://issues.apache.org/jira/browse/FLINK-9992 <
>> > https://issues.apache.org/jira/browse/FLINK-9992>>)
>> > >> FsStorageLocationReferenceTest#testEncodeAndDecode failed in CI
>> > >>
>> > >> -> There is already a PR for it and needs review.
>> > >>
>> > >> - FLINK-18842 (https://issues.apache.org/jira/browse/FLINK-18842 <
>> > https://issues.apache.org/jira/browse/FLINK-18842> <
>> > >> https://issues.apache.org/jira/browse/FLINK-18842 <
>> > https://issues.apache.org/jira/browse/FLINK-18842>>)
>> > >> e2e test failed to download "localhost:/flink.tgz" in "Wordcount
>> on
>> > >> Docker test"
>> > >>
>> > >>
>> > >>> 在 2020年8月11日,下午2:08,Robert Metzger  写道:
>> > >>>
>> > >>> Hi team,
>> > >>>
>> > >>> 2 weeks have passed since the last update. None of the test
>> stabilities
>> > >>> I've mentioned have been addressed since then.
>> > >>>
>> > >>> Here's an updated status report of Blockers and Test instabilities:
>> > >>>
>> > >>> Blockers <
>> > >> https://issues.apache.org/jira/browse/FLINK-18682?filter=12349334>:
>> > >>> Currently 3 blockers (2x Hive, 1x CI Infra)
>> > >>>
>> > >>> Test-Instabilities
>> > >>> 
>> > >> (total
>> > >>> 79) which failed recently or frequently:
>> > >>>
>> > >>>
>> > >>> - FLINK-18807 
>> > >>> FlinkKafkaProducerITCase.te

Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-20 Thread Yu Li
Thanks Zhu Zhu for being our release manager and everyone else who made the
release possible!

Best Regards,
Yu


On Thu, 17 Sep 2020 at 13:29, Zhu Zhu  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.11.2, which is the second bugfix release for the Apache Flink 1.11
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348575
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Zhu
>


[jira] [Created] (FLINK-19302) Flushing of BoundedBlockingResultPartition should finish current BufferBuilder

2020-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19302:
---

 Summary: Flushing of BoundedBlockingResultPartition should finish 
current BufferBuilder
 Key: FLINK-19302
 URL: https://issues.apache.org/jira/browse/FLINK-19302
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, flushing of BoundedBlockingResultPartition flushes and closes the 
current BufferConsumer but dose not finish the corresponding BufferBuilder. As 
a result, the records coming latter can be appended to already recycled  buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Can you unify the language ?

2020-09-20 Thread Timo Walther

Hi,

you are right. Having two languages in the code base doesn't make our 
lives easier. But Flink is a big project with a long history, multiple 
design shifts, and many contributors. It is naturally that the bigger a 
code base gets, the messier it looks like. It must be a continuous 
effort to improve legacy code.


We are aiming to have Scala only in dedicated API modules. We encourage 
people to choose Java whenever new code is developed or refactored. So 
eventually, Flink might be Scala free but this will take some time.


For example, we have already dedicated porting guidelines for the 
flink-table module [1][2]. I'm sure other component maintainers have 
similar thoughts.


Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-28%3A+Long-term+goal+of+making+flink-table+Scala-free
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions


On 19.09.20 16:21, 490548661 wrote:

The Flink code is too messy. It uses both Java and scala. 
Can you unify the language and use Java only .  Is it not tiring to maintain 
two sets of codes 





[VOTE] Apache Flink Stateful Functions 2.2.0, release candidate #1

2020-09-20 Thread Tzu-Li (Gordon) Tai
Hi everyone,

Please review and vote on the release candidate #1 for the version 2.2.0 of
Apache Flink Stateful Functions,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

***Testing Guideline***

You can find here [1] a page in the project wiki on instructions for
testing.
To cast a vote, it is not necessary to perform all listed checks,
but please mention which checks you have performed when voting.

***Release Overview***

As an overview, the release consists of the following:
a) Stateful Functions canonical source distribution, to be deployed to the
release repository at dist.apache.org
b) Stateful Functions Python SDK distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository

***Staging Areas to Review***

The staging areas containing the above mentioned artifacts are as follows,
for your review:
* All artifacts for a) and b) can be found in the corresponding dev
repository at dist.apache.org [2]
* All artifacts for c) can be found at the Apache Nexus Repository [3]

All artifacts are signed with the key
1C1E2394D3194E1944613488F320986D35C33D6A [4]

Other links for your review:
* JIRA release notes [5]
* source code tag "release-2.2.0-rc1" [6]

***Vote Duration***

The voting time will run for at least 72 hours, lasting until 24 Sep., 7AM
UTC.
It is adopted by majority approval, with at least 3 PMC affirmative votes.

Thanks,
Gordon

[1] https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+
Stateful+Functions+Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-statefun-2.2.0-rc1/
[3] https://repository.apache.org/content/repositories/orgapacheflink-1398/
[4] https://dist.apache.org/repos/dist/release/flink/KEYS
[5]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348350
[6]
https://gitbox.apache.org/repos/asf?p=flink-statefun.git;a=commit;h=1f27f38ab15e536fbd961e5ae1978ef44900c0f6