Re: Sideoutput in Flink Web Dashboard

2021-07-12 Thread Chesnay Schepler
This is currently not possible. It would indeed be a useful thing to 
have, but there are considerable technical hurdles to realize that.


On 11/07/2021 06:29, Dipanjan Mazumder wrote:

Hi,
    I am using side output to actually create named substreams from main source 
streams. While it works fine , but it seems in the Web dashboard only operator 
are visible and streams are not. Is there any way to show streams or sideouput 
in WEB dashboard. That will make the graph more useful in terms of how the 
streams are divided and how data flows.
RegardsDipanjan





[jira] [Created] (FLINK-23353) UDTAGG can't execute in Batch mode

2021-07-12 Thread hayden zhou (Jira)
hayden zhou created FLINK-23353:
---

 Summary: UDTAGG can't execute in Batch mode
 Key: FLINK-23353
 URL: https://issues.apache.org/jira/browse/FLINK-23353
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.13.1
Reporter: hayden zhou



{code:java}

public class Top2Test {
public static void main(String[] args) {

EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode()build();
TableEnvironment tEnv = TableEnvironment.create(settings);

Table sourceTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name",DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.INT())
),
row(1, "hayden", 18),
row(3, "hayden", 19),
row(4, "hayden", 20),
row(2, "jaylin", 20)
);

tEnv.createTemporaryView("source", sourceTable);

Table rT = tEnv.from("source")
.groupBy($("name"))
.flatAggregate(call(Top2.class, $("price")).as("price", "rank"))
.select($("name"), $("price"), $("rank"));
rT.execute().print();
}


public static class Top2Accumulator {
public Integer first;
public Integer second;
}

public static class Top2 extends TableAggregateFunction, Top2Accumulator> {

@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}

public void accumulate(Top2Accumulator acc, Integer value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}

public void merge(Top2Accumulator acc, Iterable it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}

public void emitValue(Top2Accumulator acc, Collector> out) {
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}

}

{code}

got errors as below:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
 fields=[name, price, rank])
+- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, 
_UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
   +- LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
  +- LogicalUnion(all=[true])
 :- LogicalProject(id=[CAST(1):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
 :  +- LogicalValues(tuples=[[{ 0 }]])
 :- LogicalProject(id=[CAST(3):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
 :  +- LogicalValues(tuples=[[{ 0 }]])
 :- LogicalProject(id=[CAST(4):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
 :  +- LogicalValues(tuples=[[{ 0 }]])
 +- LogicalProject(id=[CAST(2):INTEGER], 
name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
+- LogicalValues(tuples=[[{ 0 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.col

[jira] [Created] (FLINK-23354) Limit the size of blob cache on TaskExecutor

2021-07-12 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23354:


 Summary: Limit the size of blob cache on TaskExecutor
 Key: FLINK-23354
 URL: https://issues.apache.org/jira/browse/FLINK-23354
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.14.0


Currently a TaskExecutor uses BlobCache to cache the blobs transported from 
JobManager. The caches are the local file stored on the TaskExecutor. The blob 
cache will not be cleaned up until one hour after the related job is finished. 
At present, JobInformation and TaskInformation are transported via blob. If a 
lot of jobs are submitted, the blob cache will occupy large amount of disk 
space. In FLINK-23218, we are going to distribute the cached ShuffleDescriptors 
via blob. When large amount of failovers happen, there will be a lot of cache 
stored on local disk. In extreme cases, the blob would blow up the disk space.

So we need to add a limit size for the blob cache on TaskExecutor, as described 
in the comments of FLINK-23218. The main idea is to add a size limit and and 
delete blobs in LRU order if the size limit is exceeded. Before a blob item is 
cached, TaskExecutor will firstly check the overall size of cache. If the 
overall size exceeds the limit, the blob will be deleted in LRU order until the 
limit is not exceeded anymore. For the blob cache that is deleted, if it is 
used afterwards, it will be downloaded from the blob server again.

The default value of the size limit of the blob cache on TaskExecutor will be 
10GiB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23355) Use Flink SQL ,How to implement insert into seq to multi sinks?

2021-07-12 Thread Bo Wang (Jira)
Bo Wang created FLINK-23355:
---

 Summary: Use Flink SQL ,How to implement insert into seq to multi 
sinks?
 Key: FLINK-23355
 URL: https://issues.apache.org/jira/browse/FLINK-23355
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: Bo Wang


Now, I want to use Flink SQL implement insert to multi sinks in seq, such as  I 
have one Kafka Table  source_table_one  and have two sink table , db table 
sink_db_one,  kafka table sink_kafka_two. 
I want to implement when the element from source_table_one arriavd, first  
insert into table sink_db_one select * from source_table_one when element be 
inserted,  second stage insert into table sink_kafka_two select * from 
source_table_one。 

the web system will listen the sink_kafka_two topic  and query data from 
sink_db_one table. 

 

anyone have some idea to implement this function ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23356) HBase delegation token expired after 7 days

2021-07-12 Thread Gabor Somogyi (Jira)
Gabor Somogyi created FLINK-23356:
-

 Summary: HBase delegation token expired after 7 days
 Key: FLINK-23356
 URL: https://issues.apache.org/jira/browse/FLINK-23356
 Project: Flink
  Issue Type: Bug
  Components: Connectors / HBase
Affects Versions: 1.13.0
Reporter: Gabor Somogyi


FLINK-6376 has solved the problem for HDFS but HBase still has the issue.
The root cause of the issue is that HBase delegation token expires after 7 days 
and Flink is not re-obtaining any kind of token at the moment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-181: Custom netty HTTP request inbound/outbound handlers

2021-07-12 Thread Konstantin Knauf
+1 (binding)

Assuming that we continue to vote in this thread for now.

Thank you for your patience!

On Mon, Jul 12, 2021 at 8:56 AM Chesnay Schepler  wrote:

> The vote has not reached the required number of votes to be considered
> successful.
>
> As outlined in the bylaws
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Actions>
>
> FLIP votes require 3 binding +1 votes (i.e., from committers).
>
> On 10/07/2021 16:13, Márton Balassi wrote:
> > Hi team,
> >
> > Thank you for your input, I am closing this vote as successful.
> > Austin: thank you, I have added the experimental annotation explicitly to
> > the FLIP.
> >
> > On Tue, Jul 6, 2021 at 5:17 PM Gabor Somogyi 
> > wrote:
> >
> >> +1 (non-binding)
> >> The @Experimental annotation is really missing, Marton could you add it
> >> please?
> >>
> >>
> >> On Tue, Jul 6, 2021 at 5:04 PM Austin Cawley-Edwards <
> >> austin.caw...@gmail.com> wrote:
> >>
> >>> Hi Márton,
> >>>
> >>> The FLIP looks generally good to me, though could we add the
> >>> `@Experimental` annotation to the proposed interfaces so it is in sync
> >> with
> >>> what was agreed in the discussion thread?
> >>>
> >>> Thanks,
> >>> Austin
> >>>
> >>> On Tue, Jul 6, 2021 at 9:40 AM Gyula Fóra  wrote:
> >>>
>  +1 from my side
> 
>  This is a good addition that will open many possibilities in the
> future
> >>> and
>  solve some immediate issues with the current Kerberos integration.
> 
>  Gyula
> 
>  On Tue, Jul 6, 2021 at 2:50 PM Márton Balassi <
> >> balassi.mar...@gmail.com>
>  wrote:
> 
> > Hi everyone, I would like to start a vote on FLIP-181 [1] which was
> > discussed in this thread [2]. The vote will be open for at least 72
> >>> hours
> > until July 9th unless there is an objection or not enough votes.
> >
> > [1] https://cwiki.apache.org/confluence/x/CAUBCw
> > [2]
> >
> >
> >>
> https://lists.apache.org/thread.html/r53b6b8931b6248a849855dad27b1a431e55cdd48ca055910e8f015a8%40%3Cdev.flink.apache.org%3E
>
>
>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: [VOTE] FLIP-181: Custom netty HTTP request inbound/outbound handlers

2021-07-12 Thread Till Rohrmann
Thanks for starting the vote Marton.

I have two comments:

* I would suggest that the interfaces return Optional or at
least have a @Nullable annotation in order to make the contract explicit.
* The test plan should contain tests for the general infrastructure which
should live in Flink. We should test that factories are loaded and that the
handlers are set up in the correct order.

I would consider these two changes to the original FLIP small. I give my +1
(binding) conditionally under the assumption that the comments will be
addressed.

Cheers,
Till

On Mon, Jul 12, 2021 at 10:15 AM Konstantin Knauf  wrote:

> +1 (binding)
>
> Assuming that we continue to vote in this thread for now.
>
> Thank you for your patience!
>
> On Mon, Jul 12, 2021 at 8:56 AM Chesnay Schepler 
> wrote:
>
> > The vote has not reached the required number of votes to be considered
> > successful.
> >
> > As outlined in the bylaws
> > <
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026#FlinkBylaws-Actions
> >
> >
> > FLIP votes require 3 binding +1 votes (i.e., from committers).
> >
> > On 10/07/2021 16:13, Márton Balassi wrote:
> > > Hi team,
> > >
> > > Thank you for your input, I am closing this vote as successful.
> > > Austin: thank you, I have added the experimental annotation explicitly
> to
> > > the FLIP.
> > >
> > > On Tue, Jul 6, 2021 at 5:17 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com>
> > > wrote:
> > >
> > >> +1 (non-binding)
> > >> The @Experimental annotation is really missing, Marton could you add
> it
> > >> please?
> > >>
> > >>
> > >> On Tue, Jul 6, 2021 at 5:04 PM Austin Cawley-Edwards <
> > >> austin.caw...@gmail.com> wrote:
> > >>
> > >>> Hi Márton,
> > >>>
> > >>> The FLIP looks generally good to me, though could we add the
> > >>> `@Experimental` annotation to the proposed interfaces so it is in
> sync
> > >> with
> > >>> what was agreed in the discussion thread?
> > >>>
> > >>> Thanks,
> > >>> Austin
> > >>>
> > >>> On Tue, Jul 6, 2021 at 9:40 AM Gyula Fóra  wrote:
> > >>>
> >  +1 from my side
> > 
> >  This is a good addition that will open many possibilities in the
> > future
> > >>> and
> >  solve some immediate issues with the current Kerberos integration.
> > 
> >  Gyula
> > 
> >  On Tue, Jul 6, 2021 at 2:50 PM Márton Balassi <
> > >> balassi.mar...@gmail.com>
> >  wrote:
> > 
> > > Hi everyone, I would like to start a vote on FLIP-181 [1] which was
> > > discussed in this thread [2]. The vote will be open for at least 72
> > >>> hours
> > > until July 9th unless there is an objection or not enough votes.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/CAUBCw
> > > [2]
> > >
> > >
> > >>
> >
> https://lists.apache.org/thread.html/r53b6b8931b6248a849855dad27b1a431e55cdd48ca055910e8f015a8%40%3Cdev.flink.apache.org%3E
> >
> >
> >
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


[jira] [Created] (FLINK-23358) Refactor CoreOptions parent-first patterns to List options

2021-07-12 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-23358:


 Summary: Refactor CoreOptions parent-first patterns to List options
 Key: FLINK-23358
 URL: https://issues.apache.org/jira/browse/FLINK-23358
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Reporter: Chesnay Schepler
 Fix For: 1.14.0


The following CoreOptions are all implemented as String options, but actually 
accept a List, with some custom parsing thrown in:
{code}
ALWAYS_PARENT_FIRST_LOADER_PATTERNS
ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL
PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS
PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL
{code}
We should refactor them to proper List options to reduce complexity.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23357) jobmanager metaspace oom

2021-07-12 Thread Borland Won (Jira)
Borland Won created FLINK-23357:
---

 Summary: jobmanager metaspace oom
 Key: FLINK-23357
 URL: https://issues.apache.org/jira/browse/FLINK-23357
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.12.2
Reporter: Borland Won
 Attachments: image-2021-07-12-16-57-55-256.png, 
image-2021-07-12-17-06-17-218.png

*Flink Version: 1.12.2*

Hi .  I created a standalone HA cluster(with 3 taskmanager and 2 jobmanager), 
and repeatedly submit new jobs to the cluster and cancel old jobs  via rest api 
. Then jobmanager master got the increasing metaspace.

  !image-2021-07-12-16-57-55-256.png!

Soon it will OOM and get the exception below:

2021-06-21 15:44:06,637 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Unhandled 
exception.2021-06-21 15:44:06,637 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Unhandled 
exception.java.util.concurrent.CompletionException: 
org.apache.flink.client.program.ProgramInvocationException: The program's entry 
point class 'xxx.xxx.xxx.XXXBootstrap' caused an exception during 
initialization: Metaspace. The metaspace out-of-memory error has occurred. This 
can mean two things: either Flink Master requires a larger size of JVM 
metaspace to load classes or there is a class loading leak. In the first case 
'jobmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak in user code or 
some of its dependencies which has to be investigated and fixed. The Flink 
Master has to be shutdown... at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:184)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:141)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:95)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:53)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
 ~[flink-dist_2.11-1.12.2.jar:1.12.2] at 
java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_292] at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
[flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 [flink-dist_2.11-1.12.2.jar:1.12.2] at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead

[jira] [Created] (FLINK-23359) Fix the number of available slots in testResourceCanBeAllocatedForDifferentJobAfterFree

2021-07-12 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-23359:
--

 Summary: Fix the number of available slots in 
testResourceCanBeAllocatedForDifferentJobAfterFree
 Key: FLINK-23359
 URL: https://issues.apache.org/jira/browse/FLINK-23359
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.13.0
Reporter: Yangze Guo
 Fix For: 1.14.0, 1.13.2


In 
AbstractFineGrainedSlotManagerITCase#testResourceCanBeAllocatedForDifferentJobAfterFree,
 we need only 1 default slot exist in the cluster. However, we currently 
register a TaskManager with 2 default slots. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-182: Watermark alignment

2021-07-12 Thread 刘建刚
+1 for the source watermark alignment.
In the previous flink version, the source connectors are different in
implementation and it is hard to make this feature. When the consumed data
is not aligned or consuming history data, it is very easy to cause the
unalignment. Source alignment can resolve many unstable problems.

Seth Wiesman  于2021年7月9日周五 下午11:25写道:

> +1
>
> In my opinion, this limitation is perfectly fine for the MVP. Watermark
> alignment is a long-standing issue and this already moves the ball so far
> forward.
>
> I don't expect this will cause many issues in practice, as I understand it
> the FileSource always processes one split at a time, and in my experience,
> 90% of Kafka users have a small number of partitions scale their pipelines
> to have one reader per partition. Obviously, there are larger-scale Kafka
> topics and more sources that will be ported over in the future but I think
> there is an implicit understanding that aligning sources adds latency to
> pipelines, and we can frame the follow-up "per-split" alignment as an
> optimization.
>
> On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski 
> wrote:
>
> > Hey!
> >
> > A couple of weeks ago me and Arvid Heise played around with an idea to
> > address a long standing issue of Flink: lack of watermark/event time
> > alignment between different parallel instances of sources, that can lead
> to
> > ever growing state size for downstream operators like WindowOperator.
> >
> > We had an impression that this is relatively low hanging fruit that can
> be
> > quite easily implemented - at least partially (the first part mentioned
> in
> > the FLIP document). I have written down our proposal [1] and you can also
> > check out our PoC that we have implemented [2].
> >
> > We think that this is a quite easy proposal, that has been in large part
> > already implemented. There is one obvious limitation of our PoC. Namely
> we
> > can only easily block individual SourceOperators. This works perfectly
> fine
> > as long as there is at most one split per SourceOperator. However it
> > doesn't work with multiple splits. In that case, if a single
> > `SourceOperator` is responsible for processing both the least and the
> most
> > advanced splits, we won't be able to block this most advanced split for
> > generating new records. I'm proposing to solve this problem in the future
> > in another follow up FLIP, as a solution that works with a single split
> per
> > operator is easier and already valuable for some of the users.
> >
> > What do you think about this proposal?
> > Best, Piotrek
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > [2] https://github.com/pnowojski/flink/commits/aligned-sources
> >
>


Re: Job Recovery Time on TM Lost

2021-07-12 Thread 刘建刚
Yes, time is main when detecting the TM's liveness. The count method will
check by certain intervals.

Gen Luo  于2021年7月9日周五 上午10:37写道:

> @刘建刚
> Welcome to join the discuss and thanks for sharing your experience.
>
> I have a minor question. In my experience, network failures in a certain
> cluster usually takes a time to recovery, which can be measured as p99 to
> guide configuring. So I suppose it would be better to use time than attempt
> count as the configuration for confirming TM liveness. How do you think
> about this? Or is the premise right according to your experience?
>
> @Lu Niu 
> > Does that mean the akka timeout situation we talked above doesn't apply
> to flink 1.11?
>
> I suppose it's true. According to the reply from Till in FLINK-23216
> , it should be
> confirmed that the problem is introduced by declarative resource
> management, which is introduced to Flink in 1.12.
>
> In previous versions, although JM still uses heartbeat to check TMs
> status, RM will tell JM about TM lost once it is noticed by Yarn. This is
> much faster than JM's heartbeat mechanism, if one uses default heartbeat
> configurations. However, after 1.12 with declarative resource management,
> RM will no longer tell this to JM, since it doesn't have a related
> AllocationID.  So the heartbeat mechanism becomes the only way JM can know
> about TM lost.
>
> On Fri, Jul 9, 2021 at 6:34 AM Lu Niu  wrote:
>
>> Thanks everyone! This is a great discussion!
>>
>> 1. Restarting takes 30s when throwing exceptions from application code
>> because the restart delay is 30s in config. Before lots of related config
>> are 30s which lead to the confusion. I redo the test with config:
>>
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
>> backoffTimeMS=1000)
>> heartbeat.timeout: 50
>> akka.ask.timeout 30 s
>> akka.lookup.timeout 30 s
>> akka.tcp.timeout 30 s
>> akka.watch.heartbeat.interval 30 s
>> akka.watch.heartbeat.pause 120 s
>>
>>Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
>> restart takes 14s. Does that mean the akka timeout situation we talked
>> above doesn't apply to flink 1.11?
>>
>> 2. About flaky connection between TMs, we did notice sometimes exception
>> as follows:
>> ```
>> TaskFoo switched from RUNNING to FAILED on
>> container_e02_1599158147594_156068_01_38 @
>> xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
>> (dataPort=40957).
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
>> This might indicate that the remote task manager was lost.
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concu

[jira] [Created] (FLINK-23360) Facing broker disconnected issue while establishing Kafka with Azure Event hubs in Spring boot application #543

2021-07-12 Thread Rajiya Mulani (Jira)
Rajiya Mulani created FLINK-23360:
-

 Summary: Facing broker disconnected issue while establishing Kafka 
with Azure Event hubs in Spring boot application #543
 Key: FLINK-23360
 URL: https://issues.apache.org/jira/browse/FLINK-23360
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Runtime / Configuration
Reporter: Rajiya Mulani


# Article link I followed to configure Event hub.
[https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quickstart-kafka-enabled-event-hubs]
 (TLS/SSL)

 # code
Configurations in .properties file

###
h2.  
h2. Properties For Azure Event Hub with Kafka

###

kafkaConnectionUrl=**.servicebus.windows.net:9093*
*security.protocol=SASL_SSL*
*sasl.mechanism=PLAIN*
*sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="$ConnectionString" 
password="Endpoint=sb://**.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*";
 {{ POM File : }}

4.0.0

***-processor*
*0.0.1-SNAPSHOT*

***-common*
*jar*

*${basedir}/..*



*com.*.iot
*-processor-data


org.apache.kafka
kafka-clients
0.11.0.0


org.apache.commons
commons-lang3
3.9


org.apache.commons
commons-io
1.3.2


commons-configuration
commons-configuration
1.10


'

Kafka producer Configurations :
[@bean|https://github.com/bean]
public Producer IndexMessageKafkaProducer() {
 {{ logger.info("BootStrap Server ");
logger.info(new CommonUtility().getPropertyValue("kafkaConnectionUrl"));
Map configProps = new HashMap();

  configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
  configProps.put(ProducerConfig.ACKS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerAcks"));
  configProps.put(ProducerConfig.RETRIES_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerRetries"));
  configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBatchSize"));
  configProps.put(ProducerConfig.LINGER_MS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerLingerMs"));
  configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, new 
CommonUtility().getPropertyValue("kafkaProducerBufferMemory"));
  configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("kafkaTopicKeySerializer"));
  configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,new 
CommonUtility().getPropertyValue("IndexMessageSerializer"));  }}}
Kafka consumer Configurations:

[@bean|https://github.com/bean]
public Consumer IndexMessageKafkaConsumer() {
 {{ Map configProps = new HashMap();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaConnectionUrl"));
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, new 
CommonUtility().getPropertyValue("indexConsumerGroupid"));
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, new 
CommonUtility().getPropertyValue("offset"));
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, new 
CommonUtility().getPropertyValue("autoCommit"));

configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new 
CommonUtility().getPropertyValue("kafkaTopicKeyDeserializer"));
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new 
CommonUtility().getPropertyValue("IndexMessageDeserializer"));


System.out.println("Inside Consumer Configuration");
return new KafkaConsumer(configProps); }}}
{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New Apache Flink Committer - Yuan Mei

2021-07-12 Thread Yuan Mei
Thanks, everyone! ☺


Best Regards,
Yuan

On Fri, Jul 9, 2021 at 7:30 PM Benchao Li  wrote:

> Congratulations Yuan~
>
> Jun Qin  于2021年7月9日周五 下午4:27写道:
>
> > Congratulations, Yuan!
> >
> > > On Jul 8, 2021, at 11:49 AM, Jingsong Li 
> wrote:
> > >
> > > Congratulations Yuan!
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Jul 8, 2021 at 5:43 PM Arvid Heise  wrote:
> > >
> > >> Yay!
> > >>
> > >> On Thu, Jul 8, 2021 at 10:02 AM Jiayi Liao 
> > >> wrote:
> > >>
> > >>> Congratulations Yuan!
> > >>>
> > >>> Best,
> > >>> Jiayi Liao
> > >>>
> > >>> On Thu, Jul 8, 2021 at 3:55 PM Roman Khachatryan 
> > >> wrote:
> > >>>
> >  Congratulations Yuan!
> > 
> >  Regards,
> >  Roman
> > 
> >  On Thu, Jul 8, 2021 at 6:02 AM Yang Wang 
> > >> wrote:
> > >
> > > Congratulations Yuan!
> > >
> > > Best,
> > > Yang
> > >
> > > XING JIN  于2021年7月8日周四 上午11:46写道:
> > >
> > >> Congratulations Yuan~!
> > >>
> > >> Roc Marshal  于2021年7月8日周四 上午11:28写道:
> > >>
> > >>> Congratulations, Yuan!
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> At 2021-07-08 01:21:40, "Yu Li"  wrote:
> >  Hi all,
> > 
> >  On behalf of the PMC, I’m very happy to announce Yuan Mei as a
> > >> new
> >  Flink
> >  committer.
> > 
> >  Yuan has been an active contributor for more than two years,
> > >> with
> >  code
> >  contributions on multiple components including kafka connectors,
> >  checkpointing, state backends, etc. Besides, she has been
> > >> actively
> > >>> involved
> >  in community activities such as helping manage releases,
> > >>> discussing
> >  questions on dev@list, supporting users and giving talks at
> > >> conferences.
> > 
> >  Please join me in congratulating Yuan for becoming a Flink
> >  committer!
> > 
> >  Cheers,
> >  Yu
> > >>>
> > >>
> > 
> > >>>
> > >>
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> >
> >
>
> --
>
> Best,
> Benchao Li
>


[jira] [Created] (FLINK-23361) After ZooKeeper sets authority, HA does not work

2021-07-12 Thread shouzuo meng (Jira)
shouzuo meng created FLINK-23361:


 Summary: After ZooKeeper sets authority, HA does not work
 Key: FLINK-23361
 URL: https://issues.apache.org/jira/browse/FLINK-23361
 Project: Flink
  Issue Type: Bug
  Components: flink-contrib
Affects Versions: 1.13.1
 Environment: Flink 1.13.1
Reporter: shouzuo meng
 Fix For: 1.13.1


When you I "high-availability.zookeeper.path.root",

but execute HA, it doesn't work,It is possible that there is a problem in 
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#isRootPath
{code:java}
private static boolean isRootPath(String remainingPath) {
     return ZKPaths.PATH_SEPARATOR.equals(remainingPath);
 }{code}
when i use "high-availability.zookeeper.path.root" , remainingPath should 
equals the root path that I specified



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Release flink-shaded 14

2021-07-12 Thread Chesnay Schepler

Hello,

I recently bumped various dependencies in flink-shaded (jackson, guava, 
netty, zookeeper) and so would like to create a new flink-shaded 
release, ideally asap so we can observe the impact for longer.


Are there any concerns, or ideas for what else we should change in 
flink-shaded before the Flink 1.14 release?


Otherwise I would start the release tomorrow or so, unless someone else 
is interested in preparing the release.




[DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-12 Thread Anton,Kalashnikov

Hey!

There is a wish to decrease amount of in-flight data which can improve 
aligned checkpoint time(fewer in-flight data to process before 
checkpoint can complete) and improve the behaviour and performance of 
unaligned checkpoints (fewer in-flight data that needs to be persisted 
in every unaligned checkpoint). The main idea is not keepping as much 
in-flight data as much memory we have but keeping the amount of data 
which can be predictably handling for configured amount of time(ex. we 
keep data which can be processed in 1 sec). It can be achived by 
calculation of the effective throughput and following changes the buffer 
size based on the this throughput. More details about the proposal you 
can find here [1].


What are you thoughts about it?


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment



--
Best regards,
Anton Kalashnikov



Re: [DISCUSS] FLIP-182: Watermark alignment

2021-07-12 Thread Eron Wright
The notion of per-split watermarks seems quite interesting.  I think the
idleness feature could benefit from a per-split approach too, because
idleness is typically related to whether any splits are assigned to a given
operator instance.


On Mon, Jul 12, 2021 at 3:06 AM 刘建刚  wrote:

> +1 for the source watermark alignment.
> In the previous flink version, the source connectors are different in
> implementation and it is hard to make this feature. When the consumed data
> is not aligned or consuming history data, it is very easy to cause the
> unalignment. Source alignment can resolve many unstable problems.
>
> Seth Wiesman  于2021年7月9日周五 下午11:25写道:
>
> > +1
> >
> > In my opinion, this limitation is perfectly fine for the MVP. Watermark
> > alignment is a long-standing issue and this already moves the ball so far
> > forward.
> >
> > I don't expect this will cause many issues in practice, as I understand
> it
> > the FileSource always processes one split at a time, and in my
> experience,
> > 90% of Kafka users have a small number of partitions scale their
> pipelines
> > to have one reader per partition. Obviously, there are larger-scale Kafka
> > topics and more sources that will be ported over in the future but I
> think
> > there is an implicit understanding that aligning sources adds latency to
> > pipelines, and we can frame the follow-up "per-split" alignment as an
> > optimization.
> >
> > On Fri, Jul 9, 2021 at 6:40 AM Piotr Nowojski 
> > wrote:
> >
> > > Hey!
> > >
> > > A couple of weeks ago me and Arvid Heise played around with an idea to
> > > address a long standing issue of Flink: lack of watermark/event time
> > > alignment between different parallel instances of sources, that can
> lead
> > to
> > > ever growing state size for downstream operators like WindowOperator.
> > >
> > > We had an impression that this is relatively low hanging fruit that can
> > be
> > > quite easily implemented - at least partially (the first part mentioned
> > in
> > > the FLIP document). I have written down our proposal [1] and you can
> also
> > > check out our PoC that we have implemented [2].
> > >
> > > We think that this is a quite easy proposal, that has been in large
> part
> > > already implemented. There is one obvious limitation of our PoC. Namely
> > we
> > > can only easily block individual SourceOperators. This works perfectly
> > fine
> > > as long as there is at most one split per SourceOperator. However it
> > > doesn't work with multiple splits. In that case, if a single
> > > `SourceOperator` is responsible for processing both the least and the
> > most
> > > advanced splits, we won't be able to block this most advanced split for
> > > generating new records. I'm proposing to solve this problem in the
> future
> > > in another follow up FLIP, as a solution that works with a single split
> > per
> > > operator is easier and already valuable for some of the users.
> > >
> > > What do you think about this proposal?
> > > Best, Piotrek
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> > > [2] https://github.com/pnowojski/flink/commits/aligned-sources
> > >
> >
>


Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-07-12 Thread Stephan Ewen
Hi!

Thanks for writing this FLIP, and interesting idea.

I would like to understand a bit better why exactly we need this, and what
our alternative options are. My main concerns are the following:

*(1) Can we achieve this without changing the checkpointing mechanism?*

The checkpoint mechanism is already complex and it is super sensitive (a
bug there threatens every user with data loss). Such components must stay
as minimal as possible. When there is a way to solve this outside
checkpointing, then that should be the default approach.

To check that, I would really like to understand this specific CDC use case
a bit more. If I understand it correctly, the issue is that checkpoints are
not possible while the initial database snapshot is ingested, before the
actual Change-Data processing starts.

(a) What is the reason that no checkpoint is possible during this time? Why
is there no way to store in the checkpoint the position of the snapshot
ingestion? Is it because the snapshots are streamed in from a JDBC query
and that is not deterministic, meaning retrying the query yields different
rows (or order of rows)?

(b) If there can be no checkpoint progress during the snapshot ingestion
and all checkpoints are rejected, what prevents us from just storing an
empty state in the checkpoint? Meaning the system may have taken a bunch of
checkpoints, but any recovery would start the source from the beginning.
Is there some concern about not emitting data during that database snapshot
reading phase?

(c) There is quite some work in the direction of blending batch and
streaming execution, meaning having an initial batch execution step for
some data (like the backlog in Kafka, or a DB snapshot) and then switching
to streaming execution for the real-time stream (new records in Kafka, or
CDC records). If we focus on advancing that, we get the behavior that all
the initial data is processed in one batch (no intermediate checkpoint),
and we also get the performance speedup provided by not having to work with
RocksDB for the batch step. I think this is where the future is.

Regarding the case to not checkpoint in-between transaction markers: I
don't know how often these occur, but I would expect potentially very
often. If we reject whenever some operator is in-between transaction
markers, we will reject very many (possibly almost all) checkpoints once we
get to a certain scale (1000s of parallel operators).
I think we need a different approach there, for example first grouping
events into transaction sessions, or so.


*(2) Inability to Checkpoint shouldn't become a first-class concept.*

I think this is really a question of Flink design principles and direction.
I believe we need to push Flink to a direction that it can always
checkpoint, and more frequently and more reliably than at the moment.
Various ongoing efforts move the system in that direction, giving it the
ability to checkpoint always (async sources, async sinks, non-blocking
mailbox), more predictably (unaligned checkpoints) and more frequently
(log-based checkpoints).

That is something that makes it much easier to operate the system. When
checkpoints become infrequent or unpredictable in the interval (when they
are not reliable), it becomes a big operational problem.

If we start assuming that operators can arbitrarily reject checkpoints, we
will easily get into situations where many checkpoints are rejected and it
takes quite a while until a checkpoint can happen. There are many jobs with
1000s of operators, and when one of them rejects a checkpoint, the
checkpoint as a whole fails.
Letting operators decide when to take checkpoints is a cool property
that makes many things easier, but I am skeptical whether this is
compatible with global checkpoints.

Not having frequent checkpoints (but very infrequent ones) is really more
of a batch situation, and that brings me back to the point above: I think a
really well-working solution for this would be the hybrid batch/streaming
execution.

For that reason, I am very skeptical that we should add a first-class API
that suggests that a checkpoint happens only sometimes, when everyone
happens to agree to it.
I think that sends the wrong signal, both to developer direction, and to
users in how to implement their applications. Rejecting checkpoints should
be a rare thing, so not something an API suggests can happen all the time.

Rejecting checkpoints for a few times at the beginning of a job might also
be feasible, when later checkpoint generally succeed, but then again, the
proposed API does not suggest that, it suggests any checkpoint is always
rejectably by all operators.

If we really need checkpoint rejection, I think Piotr's idea goes into a
pretty good direction: Having a "RejectionException" that leads to a
checkpoint abort in the regular way, but doesn't increment the
failure counter.

*Conclusion*

Because of the sensitivity of the checkpoint mechanism, and because the API
proposed here suggests a behavior of 

[jira] [Created] (FLINK-23362) ClientTest.testSimpleRequests fails due to timeout on azure

2021-07-12 Thread Xintong Song (Jira)
Xintong Song created FLINK-23362:


 Summary: ClientTest.testSimpleRequests fails due to timeout on 
azure
 Key: FLINK-23362
 URL: https://issues.apache.org/jira/browse/FLINK-23362
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Queryable State
Affects Versions: 1.12.4
Reporter: Xintong Song
 Fix For: 1.12.5


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20347&view=logs&j=d44f43ce-542c-597d-bf94-b0718c71e5e8&t=03dca39c-73e8-5aaf-601d-328ae5c35f20&l=14440

{code}
[ERROR] Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 22.994 
s <<< FAILURE! - in org.apache.flink.queryablestate.network.ClientTest
[ERROR] testSimpleRequests(org.apache.flink.queryablestate.network.ClientTest)  
Time elapsed: 20.055 s  <<< FAILURE!
java.lang.AssertionError: Receive timed out
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertNotNull(Assert.java:712)
at 
org.apache.flink.queryablestate.network.ClientTest.testSimpleRequests(ClientTest.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23363) JavaCodeSplitter can not split the code from ProjectionCodeGenerator

2021-07-12 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-23363:


 Summary: JavaCodeSplitter can not split the code from 
ProjectionCodeGenerator
 Key: FLINK-23363
 URL: https://issues.apache.org/jira/browse/FLINK-23363
 Project: Flink
  Issue Type: Sub-task
Reporter: Jingsong Lee
 Fix For: 1.14.0


- JavaCodeSplitter can not split the method which has return value. We should 
add comments in JavaCodeSplitter.

- ProjectionCodeGenerator need has a method without return value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23364) Check all GeneratedClass to make sure they can be split by JavaCodeSplitter

2021-07-12 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-23364:


 Summary: Check all  GeneratedClass to make sure they can be split 
by JavaCodeSplitter
 Key: FLINK-23364
 URL: https://issues.apache.org/jira/browse/FLINK-23364
 Project: Flink
  Issue Type: Sub-task
Reporter: Jingsong Lee






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23365) flink-azure-fs-hadoop compile error because of json-smart

2021-07-12 Thread kevinsun (Jira)
kevinsun created FLINK-23365:


 Summary: flink-azure-fs-hadoop  compile error because of  
json-smart 
 Key: FLINK-23365
 URL: https://issues.apache.org/jira/browse/FLINK-23365
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
 Environment: windows
Reporter: kevinsun


project maven compile error  flink-azure-fs-hadoop  module because of 
json-smart  jar



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23366) AkkaRpcActorTest. testOnStopFutureCompletionDirectlyTerminatesAkkaRpcActor fails on azure

2021-07-12 Thread Yun Gao (Jira)
Yun Gao created FLINK-23366:
---

 Summary: AkkaRpcActorTest. 
testOnStopFutureCompletionDirectlyTerminatesAkkaRpcActor fails on azure
 Key: FLINK-23366
 URL: https://issues.apache.org/jira/browse/FLINK-23366
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Yun Gao


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=20343&view=logs&j=0da23115-68bb-5dcd-192c-bd4c8adebde1&t=05b74a19-4ee4-5036-c46f-ada307df6cf0&l=7388
{code:java}

Jul 12 18:40:57 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Jul 12 18:40:57 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Jul 12 18:40:57 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Jul 12 18:40:57 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Jul 12 18:40:57 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Jul 12 18:40:57 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Jul 12 18:40:57 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Jul 12 18:40:57 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Jul 12 18:40:57 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
Jul 12 18:40:57 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Jul 12 18:40:57 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Jul 12 18:40:57 at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
Jul 12 18:40:57 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
Jul 12 18:40:57 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
Jul 12 18:40:57 at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
Jul 12 18:40:57 at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
Jul 12 18:40:57 at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
Jul 12 18:40:57 at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
Jul 12 18:40:57 at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Jul 12 18:40:57 
Jul 12 18:40:57 [INFO] Running 
org.apache.flink.runtime.rpc.akka.TimeoutCallStackTest
Jul 12 18:40:58 [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time 
elapsed: 0.076 s - in org.apache.flink.runtime.rpc.akka.TimeoutCallStackTest
Jul 12 18:40:58 [INFO] Running 
org.apache.flink.runtime.rpc.akka.AkkaRpcActorOversizedResponseMessageTest
Jul 12 18:40:58 [INFO] Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time 
elapsed: 0.587 s - in org.apache.flink.runtime.rpc.akka.AkkaRpcServiceTest
Jul 12 18:40:58 [INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time 
elapsed: 0.159 s - in 
org.apache.flink.runtime.rpc.akka.AkkaRpcActorOversizedResponseMessageTest
Jul 12 18:40:58 [INFO] 
Jul 12 18:40:58 [INFO] Results:
Jul 12 18:40:58 [INFO] 
Jul 12 18:40:58 [ERROR] Failures: 
Jul 12 18:40:58 [ERROR]   
AkkaRpcActorTest.testOnStopFutureCompletionDirectlyTerminatesAkkaRpcActor:375 
Jul 12 18:40:58 Expected: is 
Jul 12 18:40:58  but: was 
Jul 12 18:40:58 [INFO] 
Jul 12 18:40:58 [ERROR] Tests run: 85, Failures: 1, Errors: 0, Skipped: 0
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-170 Adding Checkpoint Rejection Mechanism

2021-07-12 Thread Senhong Liu
Hi Stephan,

Thank you so much for replying and suggesting! Follow by your question, I
would give some explanation and new thoughts.

(1) More detailed info about CDC use case.

In the previous design of FLINK-CDC, they would start a full-table scanning
at the beginning by holding a read-write lock. Taking a checkpoint in the
middle would be meaningless since there is no guarantee that data would not
be changed during recovery.

(2) Could hybrid batch/streaming execution solve the problem?

It is acceptable to me and actually, the current Flink-CDC is actually
implementing a new feature like this[1][2]. But a large batch size could
still fail the checkpoint frequently after all and it could still be
confusing for users to understand what's going on.

(3) Making the checkpoint more controllable and reliable.

Overall, I am also looking for a way of making the checkpoint more
controllable and reliable, since hybrid batch/streaming execution is also a
tricky way of controlling the checkpoint. But maybe it is not configurable
or developable for users to implement for their specific use case.

However, combining with your opinion, I am thinking of designing a set of
REST API to control the checkpoint scheduler might be more acceptable.


[1] https://github.com/debezium/debezium-design-documents/blob/main/DDD-3.md
[2]
https://github.com/ververica/flink-cdc-connectors/commit/c6ca6c187471b538a9774258d2572194e1bb855b

Stephan Ewen  于2021年7月13日周二 上午1:25写道:

> Hi!
>
> Thanks for writing this FLIP, and interesting idea.
>
> I would like to understand a bit better why exactly we need this, and what
> our alternative options are. My main concerns are the following:
>
> *(1) Can we achieve this without changing the checkpointing mechanism?*
>
> The checkpoint mechanism is already complex and it is super sensitive (a
> bug there threatens every user with data loss). Such components must stay
> as minimal as possible. When there is a way to solve this outside
> checkpointing, then that should be the default approach.
>
> To check that, I would really like to understand this specific CDC use case
> a bit more. If I understand it correctly, the issue is that checkpoints are
> not possible while the initial database snapshot is ingested, before the
> actual Change-Data processing starts.
>
> (a) What is the reason that no checkpoint is possible during this time? Why
> is there no way to store in the checkpoint the position of the snapshot
> ingestion? Is it because the snapshots are streamed in from a JDBC query
> and that is not deterministic, meaning retrying the query yields different
> rows (or order of rows)?
>
> (b) If there can be no checkpoint progress during the snapshot ingestion
> and all checkpoints are rejected, what prevents us from just storing an
> empty state in the checkpoint? Meaning the system may have taken a bunch of
> checkpoints, but any recovery would start the source from the beginning.
> Is there some concern about not emitting data during that database snapshot
> reading phase?
>
> (c) There is quite some work in the direction of blending batch and
> streaming execution, meaning having an initial batch execution step for
> some data (like the backlog in Kafka, or a DB snapshot) and then switching
> to streaming execution for the real-time stream (new records in Kafka, or
> CDC records). If we focus on advancing that, we get the behavior that all
> the initial data is processed in one batch (no intermediate checkpoint),
> and we also get the performance speedup provided by not having to work with
> RocksDB for the batch step. I think this is where the future is.
>
> Regarding the case to not checkpoint in-between transaction markers: I
> don't know how often these occur, but I would expect potentially very
> often. If we reject whenever some operator is in-between transaction
> markers, we will reject very many (possibly almost all) checkpoints once we
> get to a certain scale (1000s of parallel operators).
> I think we need a different approach there, for example first grouping
> events into transaction sessions, or so.
>
>
> *(2) Inability to Checkpoint shouldn't become a first-class concept.*
>
> I think this is really a question of Flink design principles and direction.
> I believe we need to push Flink to a direction that it can always
> checkpoint, and more frequently and more reliably than at the moment.
> Various ongoing efforts move the system in that direction, giving it the
> ability to checkpoint always (async sources, async sinks, non-blocking
> mailbox), more predictably (unaligned checkpoints) and more frequently
> (log-based checkpoints).
>
> That is something that makes it much easier to operate the system. When
> checkpoints become infrequent or unpredictable in the interval (when they
> are not reliable), it becomes a big operational problem.
>
> If we start assuming that operators can arbitrarily reject checkpoints, we
> will easily get into situations where many checkpoints are 

[jira] [Created] (FLINK-23367) testKeyGroupedInternalPriorityQueue does not dispose rocksdb properly, and fails the test

2021-07-12 Thread Yuan Mei (Jira)
Yuan Mei created FLINK-23367:


 Summary: testKeyGroupedInternalPriorityQueue does not dispose 
rocksdb properly, and fails the test
 Key: FLINK-23367
 URL: https://issues.apache.org/jira/browse/FLINK-23367
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Yuan Mei


The set of `testKeyGroupedInternalPriorityQueue` for 
`ChangelogDelegateEmbeddedRocksDBStateBackendTest` does not dispose rocksdb 
properly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)