Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-27 Thread Zhijiang
Congrats, thanks for the release manager work Zhu Zhu and everyone involved in!

Best,
Zhijiang
--
From:liupengcheng 
Send Time:2020年8月26日(星期三) 19:37
To:dev ; Xingbo Huang 
Cc:Guowei Ma ; user-zh ; Yangze 
Guo ; Dian Fu ; Zhu Zhu 
; user 
Subject:Re: [ANNOUNCE] Apache Flink 1.10.2 released

Thanks ZhuZhu for managing this release and everyone who contributed to this.

Best,
Pengcheng

 在 2020/8/26 下午7:06,“Congxian Qiu” 写入:

Thanks ZhuZhu for managing this release and everyone else who contributed
to this release!

Best,
Congxian


Xingbo Huang  于2020年8月26日周三 下午1:53写道:

> Thanks Zhu for the great work and everyone who contributed to this 
release!
>
> Best,
> Xingbo
>
> Guowei Ma  于2020年8月26日周三 下午12:43写道:
>
>> Hi,
>>
>> Thanks a lot for being the release manager Zhu Zhu!
>> Thanks everyone contributed to this!
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:
>>
>>> Thanks for Zhu's work to manage this release and everyone who
>>> contributed to this!
>>>
>>> Best,
>>> Yun Tang
>>> 
>>> From: Yangze Guo 
>>> Sent: Tuesday, August 25, 2020 14:47
>>> To: Dian Fu 
>>> Cc: Zhu Zhu ; dev ; user <
>>> user@flink.apache.org>; user-zh 
>>> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>>>
>>> Thanks a lot for being the release manager Zhu Zhu!
>>> Congrats to all others who have contributed to the release!
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>>> >
>>> > Thanks ZhuZhu for managing this release and everyone else who
>>> contributed to this release!
>>> >
>>> > Regards,
>>> > Dian
>>> >
>>> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>>> >
>>> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
>>> all others who have contributed to the release!
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>> >>
>>> >> The Apache Flink community is very happy to announce the release of
>>> Apache Flink 1.10.2, which is the first bugfix release for the Apache 
Flink
>>> 1.10 series.
>>> >>
>>> >> Apache Flink(r) is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data 
streaming
>>> applications.
>>> >>
>>> >> The release is available for download at:
>>> >> https://flink.apache.org/downloads.html
>>> >>
>>> >> Please check out the release blog post for an overview of the
>>> improvements for this bugfix release:
>>> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>> >>
>>> >> The full release notes are available in Jira:
>>> >>
>>> 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347791
>>> >>
>>> >> We would like to thank all contributors of the Apache Flink community
>>> who made this release possible!
>>> >>
>>> >> Thanks,
>>> >> Zhu
>>> >
>>> >
>>>
>>



Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread Zhijiang
Congrats, Dian!


--
From:Yun Gao 
Send Time:2020年8月27日(星期四) 17:44
To:dev ; Dian Fu ; user 
; user-zh 
Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

Congratulations Dian !

 Best
 Yun


--
Sender:Marta Paes Moreira
Date:2020/08/27 17:42:34
Recipient:Yuan Mei
Cc:Xingbo Huang; jincheng sun; 
dev; Dian Fu; 
user; user-zh
Theme:Re: [ANNOUNCE] New PMC member: Dian Fu

Congrats, Dian!
On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:

Congrats!
On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:

Congratulations Dian!

Best,
Xingbo
jincheng sun  于2020年8月27日周四 下午5:24写道:

Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various important 
features, such as the Python UDF and Pandas integration, and keeps checking and 
voting for our releases, and also has successfully produced two 
releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)



Re: Adaptive load balancing

2020-09-23 Thread Zhijiang
Hi Krishnan,

Thanks for discussing this interesting scenario! 

It makes me remind of a previous pending improvement of adaptive load balance 
for rebalance partitioner. 
Since the rebalance mode can emit the data to any nodes without precision 
consideration, then the data can be emitted based on the current backlog of 
partition adaptively which can reflect the load condition of consumers somehow.

For your keyBy case, I guess the requirement is not only for the load balance 
of processing, but also for the consistency of preloaded cache.
Do you think it is possible to implement somehow custom partitioner which can 
control the logic of keyBy distribution based on pre-defined cache distribution 
in nodes? 

Best,
Zhijiang
--
From:Navneeth Krishnan 
Send Time:2020年9月23日(星期三) 02:21
To:user 
Subject:Adaptive load balancing

Hi All,

We are currently using flink in production and use keyBy for performing a CPU 
intensive computation. There is a cache lookup for a set of keys and since 
keyBy cannot guarantee the data is sent to a single node we are basically 
replicating the cache on all nodes. This is causing more memory problems for us 
and we would like to explore some options to mitigate the current limitations.

Is there a way to group a set of keys and send to a set of nodes so that we 
don't have to replicate the cache data on all nodes?

Has someone tried implementing hashing with adaptive load balancing so that if 
a node is busy processing then the data can be routed effectively to other 
nodes which are free.

Any suggestions are greatly appreciated.

Thanks



Re: Backpressure and 99th percentile latency

2020-03-05 Thread Zhijiang
Hi Felipe,

Try to answer your below questions.

> I understand that I am tracking latency every 10 seconds for each physical 
> instance operator. Is that right?

Generally right. The latency marker is emitted from source and flow through all 
the intermediate operators until sink. This interval controls the emitting 
frequency of source.

> The backpressure goes away but the 99th percentile latency is still the same. 
> Why? Does it have no relation with each other?

The latency might be influenced by buffer flush timeout, network transport and 
load, etc.  In the case of backpressure, there are huge in-flight data 
accumulated in network wire, so the latency marker is queuing to wait for 
network transport which might bring obvious delay. Even the latency marker can 
not be emitted in time from source because of no available buffers temporarily. 

After the backpressure goes away, that does not mean there are no accumulated 
buffers on network wire, just not reaching the degree of backpressure. So the 
latency marker still needs to be queued with accumulated buffers on the wire. 
And it might take some time to digest the previous accumulated buffers 
completed to relax the latency. I guess it might be your case. You can monitor 
the metrics of "inputQueueLength" and "outputQueueLength" for confirming the 
status. Anyway, the answer is yes that it has relation with backpressure, but 
might have some delay to see the changes obviously.

>In the end I left the experiment for more than 2 hours running and only after 
>about 1,5 hour the 99th percentile latency got down to milliseconds. Is that 
>normal?

I guess it is normal as mentioned above.  After there are no accumulated 
buffers in network stack completely without backpressure, it should go down to 
milliseconds.

Best,
Zhijiang
--
From:Felipe Gutierrez 
Send Time:2020 Mar. 6 (Fri.) 05:04
To:user 
Subject:Backpressure and 99th percentile latency

Hi,

I am a bit confused about the topic of tracking latency in Flink [1]. It says 
if I use the latency track I am measuring the Flink’s network stack but 
application code latencies also can influence it. For instance, if I am using 
the metrics.latency.granularity: operator (default) and 
setLatencyTrackingInterval(1). I understand that I am tracking latency 
every 10 seconds for each physical instance operator. Is that right?

In my application, I am tracking the latency of all aggregators. When I have a 
high workload and I can see backpressure from the flink UI the 99th percentile 
latency is 13, 25, 21, and 25 seconds. Then I set my aggregator to have a 
larger window. The backpressure goes away but the 99th percentile latency is 
still the same. Why? Does it have no relation with each other?

In the end I left the experiment for more than 2 hours running and only after 
about 1,5 hour the 99th percentile latency got down to milliseconds. Is that 
normal? Please see the figure attached.

[1] 
https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: Backpressure and 99th percentile latency

2020-03-07 Thread Zhijiang
Thanks for the feedback Felipe! 
Regarding with your below concern:

> Although I think it is better to use outPoolUsage and inPoolUsage according 
> to [1]. However, in your opinion is it better (faster to see) to use 
> inputQueueLength and
> outputQueueLength or outPoolUsage and inPoolUsage to monitor a consequence of 
> backpressure? I mean, is there a faster way to show that the latency 
> increased due to
> backpressure? Maybe if I create my own metric on my own operator or udf?

The blog [1] already gave a great explanation of network stack for users in 
general and I also have the consensus on this issue.
 In particular,I can provide some further notes for your understanding.

1. It is not easy for users to get the precise total amount of input & output 
buffers, so we are not aware of whether the input & output buffers are 
exhausted and backpressure is happened from the metrics of 
input&outputQueueLength. In contrast, we can know easily that input & 
outputPoolUsage should both reach 100% once backpressure happening.

2. The inputPoolUsage has the different semantic from release-1.9. Before 1.9 
this metric is only for measuring the usage of floating buffers. But from 1.9 
it also covers the usage of exclusive buffers. That means from 1.9 you might 
see the inputPoolUsage far from 100% when backpressure happens especially in 
the data skew case, but the inputFloatingBufferUsage should be 100% instead.

3. The latency marker provided by flink framework is emitted to a random 
channel (non-broadcast) every time because of performance concern. So it is 
hard to say whether it is measuring the heavy-load channel or lightweight 
channel in short while, especially in data skew scenario.

4. In theory the latency should be increased along with the trend of increased 
input&outputQueueLength and input&outputPoolUsage. All of them should be 
proportional to have the same trend in most cases. 

Best,
Zhijiang




--
From:Felipe Gutierrez 
Send Time:2020 Mar. 7 (Sat.) 18:49
To:Arvid Heise 
Cc:Zhijiang ; user 
Subject:Re: Backpressure and 99th percentile latency

Hi,
I implemented my own histogram metric on my operator to measure the
latency. The latency is following the throughput at the same pace now.
The figures are attached.

Best,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Fri, Mar 6, 2020 at 9:38 AM Felipe Gutierrez
 wrote:
>
> Thanks for the clarified answer @Zhijiang, I am gonna monitor
> inputQueueLength and outputQueueLength to check some relation with
> backpressure. Although I think it is better to use outPoolUsage and
> inPoolUsage according to [1].
> However, in your opinion is it better (faster to see) to use
> inputQueueLength and outputQueueLength or outPoolUsage and inPoolUsage
> to monitor a consequence of backpressure? I mean, is there a faster
> way to show that the latency increased due to backpressure? Maybe if I
> create my own metric on my own operator or udf?
>
> Thanks @Arvid. In the end I want to be able to hold SLAs. For me, the
> SLA would be the minimum latency. If I understood correctly, in the
> time that I started to have backpressure the latency track metrics are
> not a very precise indication of how much backpressure my application
> is suffering. It just indicates that there is backpressure.
> What would you say that is more less precise metric to tune the
> throughput in order to not have backpressure. Something like, if I
> have 50,000 milliseconds of latency and the normal latency is 150
> milliseconds, the throughput has to decrease by a factor of 50,000/150
> times.
>
> Just a note. I am not changing the throughput of the sources yet. I am
> changing the size of the window without restart the job. But I guess
> they have the same meaning for this question.
>
> [1] https://flink.apache.org/2019/07/23/flink-network-stack-2.html
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
>
> On Fri, Mar 6, 2020 at 8:17 AM Arvid Heise  wrote:
> >
> > Hi Felipe,
> >
> > latency under backpressure has to be carefully interpreted. Latency's 
> > semantics actually require that the data source is read in a timely manner; 
> > that is, there is no bottleneck in your pipeline where data is piling up.
> >
> > Thus, to measure latency in experiments you must ensure that the current 
> > throughput is below the maximum throughput, for example by gradually 
> > increasing the throughput with a generating source or through some 
> > throttles on the external source. Until you re

Re: Help me understand this Exception

2020-03-18 Thread Zhijiang
Agree with Gordon's below explanation!

Besides that, maybe you can also check the job master's log which might 
probably show the specific exception to cause this failure.

I was thinking whether it is necessary to improve 
ExceptionInChainedOperatorException to also provide the message from the 
wrapped real exception,
then users can easily get the root cause directly, not only for the current 
message "Could not forward element to next operator".

Best,
Zhijiang


--
From:Tzu-Li (Gordon) Tai 
Send Time:2020 Mar. 18 (Wed.) 00:01
To:aj 
Cc:user 
Subject:Re: Help me understand this Exception

Hi,

The exception stack you posted simply means that the next operator in the chain 
failed to process the output watermark.
There should be another exception, which would explain why some operator was 
closed / failed and eventually leading to the above exception.
That would provide more insight to exactly why your job is failing.

Cheers,
Gordon
On Tue, Mar 17, 2020 at 11:27 PM aj  wrote:

Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements 
AssignerWithPunctuatedWatermarks {
@Override
public long extractTimestamp(GenericRecord record, long 
previousElementTimestamp) {
long timestamp = (long) record.get("event_ts");
LOGGER.info("timestamp", timestamp);
return timestamp;
}

@Override
public Watermark checkAndGetNextWatermark(GenericRecord record, long 
extractedTimestamp) {
// simply emit a watermark with every event
LOGGER.info("extractedTimestamp ", extractedTimestamp);
return new Watermark(extractedTimestamp);
}
}
Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve output 
watermark: 
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceW

Re: How do I get the outPoolUsage value inside my own stream operator?

2020-03-18 Thread Zhijiang
Hi Felipe,

I checked the code path, and the metric of outPoolUsage is under the following 
layer: TaskMetricGroup -> TaskIOMetricGroup -> "buffers" group -> 
"outPoolUsage".
It seems that you missed the `TaskIOMetricGroup` from below samples. You can 
get it from TaskMetricGroup.

Hope it solve your problem.

Best,
Zhijiang


--
From:Felipe Gutierrez 
Send Time:2020 Mar. 17 (Tue.) 17:50
To:user 
Subject:Re: How do I get the outPoolUsage value inside my own stream operator?

Hi,

just for the record. I am collecting the values of outPoolUsage using
the piece of code below inside my stream operator

OperatorMetricGroup operatorMetricGroup = (OperatorMetricGroup)
this.getMetricGroup();
TaskMetricGroup taskMetricGroup = operatorMetricGroup.parent();
MetricGroup metricGroup = taskMetricGroup.getGroup("buffers");
Gauge gauge = (Gauge) metricGroup.getMetric("outPoolUsage");
if (gauge != null && gauge.getValue() != null) {
   float outPoolUsage = gauge.getValue().floatValue();
   this.outPoolUsageHistogram.update((long) (outPoolUsage * 100));
}


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, Mar 16, 2020 at 5:17 PM Felipe Gutierrez
 wrote:
>
> Hi community,
>
> I have built my own operator (not a UDF) and I want to collect the
> metrics of "outPoolUsage" inside it. How do I do it assuming that I
> have to do some modifications in the source code?
>
> I know that the Gouge comes from
> flink-runtime/org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge.java.
> Inside of my operator MyAbstractUdfStreamOperator I can get the
> "MetricGroup metricGroup = this.getMetricGroup()".
> Then I implemented the "Gauge gauge = (Gauge)
> metricGroup.getMetric("outPoolUsage");" but it returns null all the
> time. Even when I click on the Backpressure UI Interface.
>
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com



Re: FlinkCEP - Detect absence of a certain event

2020-03-18 Thread Zhijiang
Hi Humberto,

I guess Fuji is familiar with Flink CEP and he can answer your proposed 
question. I already cc him.

Best,
Zhijiang


--
From:Humberto Rodriguez Avila 
Send Time:2020 Mar. 18 (Wed.) 17:31
To:user 
Subject:FlinkCEP - Detect absence of a certain event

In the documentation of FlinkCEP, I found that I can enforce that a particular 
event doesn't occur between two other events using notFollowedBy or notNext.
However, I was wondering If I could detect the absence of a certain event after 
a time X. For example, if an event A is not followed by another event A within 
10 seconds, fire an alert or do something.
Could be possible to define a FlinkCEP pattern to capture that situation?
Thanks in advance, Humberto



Re: End to End Latency Tracking in flink

2020-03-29 Thread Zhijiang
Hi Lu,

Besides Congxian's replies, you can also get some further explanations from 
"https://flink.apache.org/2019/07/23/flink-network-stack-2.html#latency-tracking";.

Best,
Zhijiang


--
From:Congxian Qiu 
Send Time:2020 Mar. 28 (Sat.) 11:49
To:Lu Niu 
Cc:user 
Subject:Re: End to End Latency Tracking in flink

Hi
As far as I know, the latency-tracking feature is for debugging usages, you can 
use it to debug, and disable it when running the job on production.
From my side, use $current_processing - $event_time is something ok, but keep 
the things in mind: the event time may not be the time ingested in Flink.

Best,
Congxian

Lu Niu  于2020年3月28日周六 上午6:25写道:

Hi,

I am looking for end to end latency monitoring of link job. Based on my study, 
I have two options: 

1. flink provide a latency tracking feature. However, the documentation says it 
cannot show actual latency of business logic as it will bypass all operators. 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking
 Also, the feature can significantly impact the performance so I assume it's 
not for usage in production. What are users use the latency tracking for? 
Sounds like only back pressure could affect the latency.  

2. I found another stackoverflow question on this. 
https://stackoverflow.com/questions/56578919/latency-monitoring-in-flink-application
 . The answer suggestion to expose (current processing - the event time) after 
source and before sink for end to end latency monitoring. Is this a good 
solution? If not, What’s the official solution for end to end latency tracking? 
 

Thank you! 

Best
Lu




Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Zhijiang

Thanks for the continuous efforts for engaging in Flink ecosystem Jeff!
Glad to see the progressive achievement. Wish more users try it out in practice.

Best,
Zhijiang



--
From:Dian Fu 
Send Time:2020 Mar. 31 (Tue.) 10:15
To:Jeff Zhang 
Cc:user ; dev 
Subject:Re: [ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

Hi Jeff,

Thanks for the great work and sharing it with the community! Very impressive 
and will try it out.

Regards,
Dian

在 2020年3月30日,下午9:16,Till Rohrmann  写道:
This is great news Jeff! Thanks a lot for sharing it with the community. 
Looking forward trying Flink on Zeppelin out :-)

Cheers,
Till
On Mon, Mar 30, 2020 at 2:47 PM Jeff Zhang  wrote:
Hi Folks,

I am very excited to announce the integration work of flink on apache zeppelin 
notebook is completed. You can now run flink jobs via datastream api, table 
api, sql, pyflink in apache apache zeppelin notebook. Download it here 
http://zeppelin.apache.org/download.html), 

Here's some highlights of this work

1. Support 3 kind of execution mode: local, remote, yarn
2. Support multiple languages  in one flink session: scala, python, sql
3. Support hive connector (reading from hive and writing to hive) 
4. Dependency management
5. UDF support (scala, pyflink)
6. Support both batch sql and streaming sql

For more details and usage instructions, you can refer following 4 blogs

1) Get started https://link.medium.com/oppqD6dIg5 2) Batch 
https://link.medium.com/3qumbwRIg5 3) Streaming 
https://link.medium.com/RBHa2lTIg5 4) Advanced usage 
https://link.medium.com/CAekyoXIg5

Welcome to use flink on zeppelin and give feedback and comments. 

-- 
Best Regards

Jeff Zhang



Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-09 Thread Zhijiang
Great work! Thanks Gordon for the continuous efforts for enhancing stateful 
functions and the efficient release!
Wish stateful functions becoming more and more popular in users.

Best,
Zhijiang


--
From:Yun Tang 
Send Time:2020 Apr. 9 (Thu.) 00:17
To:Till Rohrmann ; dev 
Cc:Oytun Tez ; user 
Subject:Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

Excited to see the stateful functions release!
Thanks for the great work of manager Gordon and everyone who ever contributed 
to this.

Best
Yun Tang

From: Till Rohrmann 
Sent: Wednesday, April 8, 2020 14:30
To: dev 
Cc: Oytun Tez ; user 
Subject: Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

Great news! Thanks a lot for being our release manager Gordon and to everyone 
who helped with the release.

Cheers,
Till

On Wed, Apr 8, 2020 at 3:57 AM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Thanks a lot for the release and your great job, Gordon!
Also thanks to everyone who made this release possible!

Best,
Congxian


Oytun Tez mailto:oy...@motaword.com>> 于2020年4月8日周三 上午2:55写道:

> I should also add, I couldn't agree more with this sentence in the release
> article: "state access/updates and messaging need to be integrated."
>
> This is something we strictly enforce in our Flink case, where we do not
> refer to anything external for storage, use Flink as our DB.
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com<mailto:oy...@motaword.com>
>
>   <https://www.motaword.com/blog>
>
>
> On Tue, Apr 7, 2020 at 12:26 PM Oytun Tez 
> mailto:oy...@motaword.com>> wrote:
>
>> Great news! Thank you all.
>>
>> On Tue, Apr 7, 2020 at 12:23 PM Marta Paes Moreira 
>> mailto:ma...@ververica.com>>
>> wrote:
>>
>>> Thank you for managing the release, Gordon — you did a tremendous job!
>>> And to everyone else who worked on pushing it through.
>>>
>>> Really excited about the new use cases that StateFun 2.0 unlocks for
>>> Flink users and beyond!
>>>
>>>
>>> Marta
>>>
>>> On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng 
>>> mailto:he...@apache.org>> wrote:
>>>
>>>> Thanks a lot for the release and your great job, Gordon!
>>>> Also thanks to everyone who made this release possible!
>>>>
>>>> Best,
>>>> Hequn
>>>>
>>>> On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
>>>> mailto:tzuli...@apache.org>>
>>>> wrote:
>>>>
>>>>> The Apache Flink community is very happy to announce the release of
>>>>> Apache Flink Stateful Functions 2.0.0.
>>>>>
>>>>> Stateful Functions is an API that simplifies building distributed
>>>>> stateful applications.
>>>>> It's based on functions with persistent state that can interact
>>>>> dynamically with strong consistency guarantees.
>>>>>
>>>>> Please check out the release blog post for an overview of the release:
>>>>> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>>>>>
>>>>> The release is available for download at:
>>>>> https://flink.apache.org/downloads.html
>>>>>
>>>>> Maven artifacts for Stateful Functions can be found at:
>>>>> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>>>>>
>>>>> Python SDK for Stateful Functions published to the PyPI index can be
>>>>> found at:
>>>>> https://pypi.org/project/apache-flink-statefun/
>>>>>
>>>>> Official Docker image for building Stateful Functions applications is
>>>>> currently being published to Docker Hub.
>>>>> Dockerfiles for this release can be found at:
>>>>> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
>>>>> Progress for creating the Docker Hub repository can be tracked at:
>>>>> https://github.com/docker-library/official-images/pull/7749
>>>>>
>>>>> The full release notes are available in Jira:
>>>>>
>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346878
>>>>>
>>>>> We would like to thank all contributors of the Apache Flink community
>>>>> who made this release possible!
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>> --
>>  --
>>
>> [image: MotaWord]
>> Oytun Tez
>> M O T A W O R D | CTO & Co-Founder
>> oy...@motaword.com<mailto:oy...@motaword.com>
>>
>>   <https://www.motaword.com/blog>
>>
>



Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-27 Thread Zhijiang
Thanks Dian for the release work and thanks everyone involved. 

Best,
Zhijiang


--
From:Till Rohrmann 
Send Time:2020 Apr. 27 (Mon.) 15:13
To:Jingsong Li 
Cc:dev ; Leonard Xu ; Benchao Li 
; Konstantin Knauf ; jincheng 
sun ; Hequn Cheng ; Dian Fu 
; user ; user-zh 
; Apache Announce List 
Subject:Re: [ANNOUNCE] Apache Flink 1.9.3 released

Thanks Dian for being our release manager and thanks to everyone who helped
making this release possible.

Cheers,
Till

On Mon, Apr 27, 2020 at 3:26 AM Jingsong Li  wrote:

> Thanks Dian for managing this release!
>
> Best,
> Jingsong Lee
>
> On Sun, Apr 26, 2020 at 7:17 PM Jark Wu  wrote:
>
>> Thanks Dian for being the release manager and thanks all who make this
>> possible.
>>
>> Best,
>> Jark
>>
>> On Sun, 26 Apr 2020 at 18:06, Leonard Xu  wrote:
>>
>> > Thanks Dian for the release and being the release manager !
>> >
>> > Best,
>> > Leonard Xu
>> >
>> >
>> > 在 2020年4月26日,17:58,Benchao Li  写道:
>> >
>> > Thanks Dian for the effort, and all who make this release possible.
>> Great
>> > work!
>> >
>> > Konstantin Knauf  于2020年4月26日周日 下午5:21写道:
>> >
>> >> Thanks for managing this release!
>> >>
>> >> On Sun, Apr 26, 2020 at 3:58 AM jincheng sun > >
>> >> wrote:
>> >>
>> >>> Thanks for your great job, Dian!
>> >>>
>> >>> Best,
>> >>> Jincheng
>> >>>
>> >>>
>> >>> Hequn Cheng  于2020年4月25日周六 下午8:30写道:
>> >>>
>> >>>> @Dian, thanks a lot for the release and for being the release
>> manager.
>> >>>> Also thanks to everyone who made this release possible!
>> >>>>
>> >>>> Best,
>> >>>> Hequn
>> >>>>
>> >>>> On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>> >>>>
>> >>>>> Hi everyone,
>> >>>>>
>> >>>>> The Apache Flink community is very happy to announce the release of
>> >>>>> Apache Flink 1.9.3, which is the third bugfix release for the
>> Apache Flink
>> >>>>> 1.9 series.
>> >>>>>
>> >>>>> Apache Flink(r) is an open-source stream processing framework for
>> >>>>> distributed, high-performing, always-available, and accurate data
>> streaming
>> >>>>> applications.
>> >>>>>
>> >>>>> The release is available for download at:
>> >>>>> https://flink.apache.org/downloads.html
>> >>>>>
>> >>>>> Please check out the release blog post for an overview of the
>> >>>>> improvements for this bugfix release:
>> >>>>> https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>> >>>>>
>> >>>>> The full release notes are available in Jira:
>> >>>>> https://issues.apache.org/jira/projects/FLINK/versions/12346867
>> >>>>>
>> >>>>> We would like to thank all contributors of the Apache Flink
>> community
>> >>>>> who made this release possible!
>> >>>>> Also great thanks to @Jincheng for helping finalize this release.
>> >>>>>
>> >>>>> Regards,
>> >>>>> Dian
>> >>>>>
>> >>>>
>> >>
>> >> --
>> >> Konstantin Knauf | Head of Product
>> >> +49 160 91394525
>> >>
>> >> Follow us @VervericaData Ververica <https://www.ververica.com/>
>> >>
>> >> --
>> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> >> Conference
>> >> Stream Processing | Event Driven | Real Time
>> >> --
>> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> >> --
>> >> Ververica GmbH
>> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> >> (Tony) Cheng
>> >>
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking
>> University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>> >
>> >
>>
>
>
> --
> Best, Jingsong Lee
>



Re: Flink Streaming Job Tuning help

2020-05-12 Thread Zhijiang
Hi Kumar,

I can give some general ideas for further analysis. 

> We are finding that flink lags seriously behind when we introduce the keyBy 
> (presumably because of shuffle across the network)
The `keyBy` would break the chained operators, so it might bring obvious 
performance sensitive in practice. I guess if your previous way without keyBy 
can make use of chained mechanism, 
the follow-up operator can consume the emitted records from the preceding 
operator directly, no need to involve in buffer serialization-> network shuffle 
-> buffer deserializer processes,
especially your record size 10K is a bit large.

If the keyBy is necessary in your case, then you can further check the current 
bottleneck. E.g. whether there are back pressure which you can monitor from web 
UI. If so, which task is the
bottleneck to cause the back pressure, and you can trace it by network related 
metrics. 

Whether there are data skew in your case, that means some task would process 
more records than others. If so, maybe we can increase the parallelism to 
balance the load.

Best,
Zhijiang
--
From:Senthil Kumar 
Send Time:2020年5月13日(星期三) 00:49
To:user@flink.apache.org 
Subject:Re: Flink Streaming Job Tuning help

I forgot to mention, we are consuming said records from AWS kinesis and writing 
out to S3.

From: Senthil Kumar 
Date: Tuesday, May 12, 2020 at 10:47 AM
To: "user@flink.apache.org" 
Subject: Flink Streaming Job Tuning help

Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar



Re: [ANNOUNCE] Apache Flink 1.10.1 released

2020-05-18 Thread Zhijiang
Thanks Yu for the release manager and everyone involved in.

Best,
Zhijiang
--
From:Arvid Heise 
Send Time:2020年5月18日(星期一) 23:17
To:Yangze Guo 
Cc:dev ; Apache Announce List ; 
user ; Yu Li ; user-zh 

Subject:Re: [ANNOUNCE] Apache Flink 1.10.1 released

Thank you very much!

On Mon, May 18, 2020 at 8:28 AM Yangze Guo  wrote:
Thanks Yu for the great job. Congrats everyone who made this release possible.
 Best,
 Yangze Guo

 On Mon, May 18, 2020 at 10:57 AM Leonard Xu  wrote:
 >
 >
 > Thanks Yu for being the release manager, and everyone else who made this 
 > possible.
 >
 > Best,
 > Leonard Xu
 >
 > 在 2020年5月18日,10:43,Zhu Zhu  写道:
 >
 > Thanks Yu for being the release manager. Thanks everyone who made this 
 > release possible!
 >
 > Thanks,
 > Zhu Zhu
 >
 > Benchao Li  于2020年5月15日周五 下午7:51写道:
 >>
 >> Thanks Yu for the great work, and everyone else who made this possible.
 >>
 >> Dian Fu  于2020年5月15日周五 下午6:55写道:
 >>>
 >>> Thanks Yu for managing this release and everyone else who made this 
 >>> release possible. Good work!
 >>>
 >>> Regards,
 >>> Dian
 >>>
 >>> 在 2020年5月15日,下午6:26,Till Rohrmann  写道:
 >>>
 >>> Thanks Yu for being our release manager and everyone else who made the 
 >>> release possible!
 >>>
 >>> Cheers,
 >>> Till
 >>>
 >>> On Fri, May 15, 2020 at 9:15 AM Congxian Qiu  
 >>> wrote:
 >>>>
 >>>> Thanks a lot for the release and your great job, Yu!
 >>>> Also thanks to everyone who made this release possible!
 >>>>
 >>>> Best,
 >>>> Congxian
 >>>>
 >>>>
 >>>> Yu Li  于2020年5月14日周四 上午1:59写道:
 >>>>>
 >>>>> The Apache Flink community is very happy to announce the release of 
 >>>>> Apache Flink 1.10.1, which is the first bugfix release for the Apache 
 >>>>> Flink 1.10 series.
 >>>>>
 >>>>> Apache Flink(r) is an open-source stream processing framework for 
 >>>>> distributed, high-performing, always-available, and accurate data 
 >>>>> streaming applications.
 >>>>>
 >>>>> The release is available for download at:
 >>>>> https://flink.apache.org/downloads.html
 >>>>>
 >>>>> Please check out the release blog post for an overview of the 
 >>>>> improvements for this bugfix release:
 >>>>> https://flink.apache.org/news/2020/05/12/release-1.10.1.html
 >>>>>
 >>>>> The full release notes are available in Jira:
 >>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346891
 >>>>>
 >>>>> We would like to thank all contributors of the Apache Flink community 
 >>>>> who made this release possible!
 >>>>>
 >>>>> Regards,
 >>>>> Yu
 >>>
 >>>
 >>
 >>
 >> --
 >>
 >> Benchao Li
 >> School of Electronics Engineering and Computer Science, Peking University
 >> Tel:+86-15650713730
 >> Email: libenc...@gmail.com; libenc...@pku.edu.cn
 >
 >


-- 
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng



Re: Singal task backpressure problem with Credit-based Flow Control

2020-05-24 Thread Zhijiang
Hi Weihua,

From your below info, it is with the expectation in credit-based flow control. 

I guess one of the sink parallelism causes the backpressure, so you will see 
that there are no available credits on Sink side and
the outPoolUsage of Map is almost 100%. It really reflects the credit-based 
states in the case of backpressure.

If you want to analyze the root cause of backpressure, you can trace the task 
stack of respective Sink parallelism to find which operation costs much,
 then you can increase the parallelism or improve the UDF(if have bottleneck) 
to have a try. In addition, i am not sure why you choose rescale to shuffle 
data among operators. The default
forward mode can gain really good performance by default if you adjusting the 
same parallelism among them.

Best,
Zhijiang
--
From:Weihua Hu 
Send Time:2020年5月24日(星期日) 18:32
To:user 
Subject:Singal task backpressure problem with Credit-based Flow Control

Hi, all

I ran into a weird single Task BackPressure problem.

JobInfo:
DAG: Source (1000)-> Map (2000)-> Sink (1000), which is linked via rescale. 
Flink version: 1.9.0
There is no related info in jobmanager/taskamanger log.

Through Metrics, I see that Map (242) 's outPoolUsage is full, but its 
downstream Sink (121)' s inPoolUsage is 0.

After dumping the memory and analyzing it, I found:
Sink (121)'s RemoteInputChannel.unannouncedCredit = 0,
Map (242)'s CreditBasedSequenceNumberingViewReader.numCreditsAvailable = 0.
This is not consistent with my understanding of the Flink network transmission 
mechanism.

Can someone help me? Thanks a lot.


Best
Weihua Hu



Re: Flink 1.10 memory and backpressure

2020-06-10 Thread Zhijiang
Regarding the monitor of backpressure, you can refer to the document [1].

As for debugging the backpressure, one option is to trace the jstack of 
respective window task thread which causes the backpressure(almost has the 
maximum inqueue buffers).
After frequent tracing the jstack, you might find which execution (e.g. state 
access) costs much, then you can probably find the bottleneck.

Besides that, in release-1.11 the unaligned checkpoint is introduced and 
implemented to mainly resolve the checkpoint issue in the case of 
backkpressure. Maybe you can pay attention
to this feature and have a try for your case.

Best,
Zhiijiang


--
From:Steven Nelson 
Send Time:2020年6月11日(星期四) 04:35
To:user 
Subject:Flink 1.10 memory and backpressure

We are working with a process and having some problems with backpressure.

The backpressure seems to be caused by a simple Window operation, which causes 
our checkpoints to fail.

What would be the recommendations for debugging the backpressure?



Re: Flink 1.10 memory and backpressure

2020-06-10 Thread Zhijiang
Sorry for missing the document link [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/back_pressure.html


--
From:Zhijiang 
Send Time:2020年6月11日(星期四) 11:32
To:Steven Nelson ; user 
Subject:Re: Flink 1.10 memory and backpressure

Regarding the monitor of backpressure, you can refer to the document [1].

As for debugging the backpressure, one option is to trace the jstack of 
respective window task thread which causes the backpressure(almost has the 
maximum inqueue buffers).
After frequent tracing the jstack, you might find which execution (e.g. state 
access) costs much, then you can probably find the bottleneck.

Besides that, in release-1.11 the unaligned checkpoint is introduced and 
implemented to mainly resolve the checkpoint issue in the case of 
backkpressure. Maybe you can pay attention
to this feature and have a try for your case.

Best,
Zhiijiang


--
From:Steven Nelson 
Send Time:2020年6月11日(星期四) 04:35
To:user 
Subject:Flink 1.10 memory and backpressure

We are working with a process and having some problems with backpressure.

The backpressure seems to be caused by a simple Window operation, which causes 
our checkpoints to fail.

What would be the recommendations for debugging the backpressure?




Re: Blocked requesting MemorySegment when Segments are available.

2020-06-10 Thread Zhijiang
Hi David,

I want to clarify two things firstly based on the info you provided below.

1. If all the tasks are running on the same TaskManager, it would be no 
credit-based flow control. The downstream operator consumes the upstream's data 
in memory directly, no need network shuffle.
2. If the TaskManager has available buffers, that does not mean the internal 
task must have available buffers on input or output sides. E.g for the output 
side of "enrich-events" operator, it has
10 buffers in maximum. After these buffers are exhausted the operator would be 
blocked no matter with available buffers on TaskManager level.

Considering your case, could you double check whether there are buffers 
accumulated in output ("outputQueueLength" metric) of "enrich-events" operator 
and whether the "numRecordsIn/numBytesIn" metric of "Test Function" operator is 
more than 0? I want to get ride of the factors of buffer leak on upstream side 
and without partition request on downstream side. Then we can further allocate 
whether
the input availability notification on downstream side has bugs to make it 
stuck forever.

Best,
Zhijiang


--
From:David Maddison 
Send Time:2020年6月9日(星期二) 19:28
To:user 
Subject:Blocked requesting MemorySegment when Segments are available.

Hi,

I keep seeing the following situation where a task is blocked getting a 
MemorySegment from the pool but the TaskManager is reporting that it has lots 
of MemorySegments available.  

I'm completely stumped as to how to debug or what to look at next so any 
hints/help/advice would be greatly appreciated!

/David/

The situation is as follows (Flink 1.10.0):

I have two operations, the first one "enrich-events" is stuck forever 
attempting to get a memory segment to send to downstream operator "Test 
function":

"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0 
tid=0x7f6424091800 nid=0x13b waiting on condition [0x7f644acf]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0xd2206000> (a 
java.util.concurrent.CompletableFuture$Signaller)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
 at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)


All the operator tasks are running on the same TaskManager and the TaskManager 
reports that it has 6,517 memory segments available, so it's confusing why the 
task would be blocked getting a memory segment.

Memory Segments
Type  Count
Available  6,517
Total  6,553

Even more confusing is that the downstream task appears to be waiting for data 
and therefore I would assume that the credit based flow control isn't causing 
the back pressure.

"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x7f6424094000 nid=0x13c 
waiting on condition [0x7f644abf]
   java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  <0xc91de1d0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor

Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Zhijiang
Congratulations Yu! Well deserved!

Best,
Zhijiang


--
From:Dian Fu 
Send Time:2020年6月17日(星期三) 10:48
To:dev 
Cc:Haibo Sun ; user ; user-zh 

Subject:Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

Congrats Yu!

Regards,
Dian

> 在 2020年6月17日,上午10:35,Jark Wu  写道:
> 
> Congratulations Yu! Well deserved!
> 
> Best,
> Jark
> 
> On Wed, 17 Jun 2020 at 10:18, Haibo Sun  wrote:
> 
>> Congratulations Yu!
>> 
>> Best,
>> Haibo
>> 
>> 
>> At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>>> Hi all,
>>> 
>>> On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>>> part of the Apache Flink Project Management Committee (PMC).
>>> 
>>> Yu Li has been very active on Flink's Statebackend component, working on
>>> various improvements, for example the RocksDB memory management for 1.10.
>>> and keeps checking and voting for our releases, and also has successfully
>>> produced two releases(1.10.0&1.10.1) as RM.
>>> 
>>> Congratulations & Welcome Yu Li!
>>> 
>>> Best,
>>> Jincheng (on behalf of the Flink PMC)
>> 
>> 



Re: Unaligned Checkpoint and Exactly Once

2020-06-21 Thread Zhijiang
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 
except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can 
satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first 
config is for describing whether the current setting mode (actually only 
exactly-once) enables UC or not, and the second config is for setting the 
different mode (exactly-once or at least-once). I guess you refer to merge them 
by using the first config form. But somehow they seem two different dimensions 
for config the checkpoint. One is for the semantic of data processing 
guarantee. And the other is for how we realize two different mechanisms to 
guarantee one (exactly-once) of the semantics. 


Best,
Zhijiang


--
From:Lu Weizheng 
Send Time:2020年6月22日(星期一) 07:20
To:user@flink.apache.org 
Subject:Unaligned Checkpoint and Exactly Once

 Hi there,

 The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which 
means a operator subtask does not need to wait all the Checkpoint barrier and 
will not block some channels. As the Checkpoint barrier is the key mechanism 
for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still 
achieve Exactly Once guarantee or only AT Least Once?

FLIP-76 :
Unaligned checkpoints will initially be an optional feature. After collecting 
experience and implementing all necessary extensions, unaligned checkpoint will 
probably be enabled by default for exactly once.

 What's more, in the following two configs,

 Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

 Config 2

checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

 Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

 Hope for replies!

 Weizheng



Re: Unaligned Checkpoint and Exactly Once

2020-06-21 Thread Zhijiang
From implementation or logic complication perspective, the AT_LEAST_ONCE is 
somehow simpler compared with EXACTLY_ONCE w/o unaligned, since 
it can always process data without blocking any channels. 


--
From:Lu Weizheng 
Send Time:2020年6月22日(星期一) 10:53
To:Zhijiang ; user@flink.apache.org 

Subject:回复: Unaligned Checkpoint and Exactly Once

 Thank you Zhijiang! The second question about config is just because I find a 
method in InputProcessorUtil. I guess AT_LEAST_ONCE  mode is a simpler way to 
handle checkpont barrier?

private static CheckpointBarrierHandler createCheckpointBarrierHandler(
  StreamConfig config,
  InputGate[] inputGates,
  SubtaskCheckpointCoordinator checkpointCoordinator,
  String taskName,
  AbstractInvokable toNotifyOnCheckpoint) {
   switch (config.getCheckpointMode()) {
  case EXACTLY_ONCE:
 if (config.isUnalignedCheckpointsEnabled()) {
return new AlternatingCheckpointBarrierHandler(
   new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, 
inputGates),
   new CheckpointBarrierUnaligner(checkpointCoordinator, taskName, 
toNotifyOnCheckpoint, inputGates),
   toNotifyOnCheckpoint);
 }
 return new CheckpointBarrierAligner(taskName, toNotifyOnCheckpoint, 
inputGates);
  case AT_LEAST_ONCE:
 int numInputChannels = 
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels).sum();
 return new CheckpointBarrierTracker(numInputChannels, 
toNotifyOnCheckpoint);
  default:
 throw new UnsupportedOperationException("Unrecognized Checkpointing 
Mode: " + config.getCheckpointMode());
   }
}

发件人: Zhijiang 
发送时间: 2020年6月22日 10:41
收件人: Lu Weizheng ; user@flink.apache.org 

主题: Re: Unaligned Checkpoint and Exactly Once
Hi Weizheng,

The unaligned checkpoint (UC) only supports exactly-once mode in Flink 1.11 
except savepoint mode. The savepoint is probably used in job rescaling
scenario and we plan to support it in future release version. Of course UC can 
satisfy exactly-once semantic as promised. 

Regarding the config issue, i am not sure I get your point here. The first 
config is for describing whether the current setting mode (actually only 
exactly-once) enables UC or not, and the second config is for setting the 
different mode (exactly-once or at least-once). I guess you refer to merge them 
by using the first config form. But somehow they seem two different dimensions 
for config the checkpoint. One is for the semantic of data processing 
guarantee. And the other is for how we realize two different mechanisms to 
guarantee one (exactly-once) of the semantics. 


Best,
Zhijiang

--
From:Lu Weizheng 
Send Time:2020年6月22日(星期一) 07:20
To:user@flink.apache.org 
Subject:Unaligned Checkpoint and Exactly Once

 Hi there,

 The new feature in Flink 1.11 will provide us the Unaligned Checkpoint which 
means a operator subtask does not need to wait all the Checkpoint barrier and 
will not block some channels. As the Checkpoint barrier is the key mechanism 
for Exactly Once guarantee, I am not sure Unaligned Checkpoint could still 
achieve Exactly Once guarantee or only AT Least Once?

FLIP-76 :
Unaligned checkpoints will initially be an optional feature. After collecting 
experience and implementing all necessary extensions, unaligned checkpoint will 
probably be enabled by default for exactly once.

 What's more, in the following two configs,

 Config 1
env.getCheckpointConfig().enableUnalignedCheckpoints();

 Config 2

checkpointCfg.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

 Does Config 2 use a even simpler way for Checkpoint than Unaligned Checkpoint?

 Hope for replies!

 Weizheng




[ANNOUNCE] Apache Flink 1.11.0 released

2020-07-07 Thread Zhijiang
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.11.0, which is the latest major release.

Apache Flink(r) is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2020/07/06/release-1.11.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346364

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Piotr & Zhijiang

Re: [ANNOUNCE] Apache Flink 1.11.1 released

2020-07-22 Thread Zhijiang
Thanks for being the release manager and the efficient work, Dian!

Best,
Zhijiang


--
From:Konstantin Knauf 
Send Time:2020年7月22日(星期三) 19:55
To:Till Rohrmann 
Cc:dev ; Yangze Guo ; Dian Fu 
; user ; user-zh 

Subject:Re: [ANNOUNCE] Apache Flink 1.11.1 released

Thank you for managing the quick follow up release. I think this was very 
important for Table & SQL users.
On Wed, Jul 22, 2020 at 1:45 PM Till Rohrmann  wrote:

Thanks for being the release manager for the 1.11.1 release, Dian. Thanks a lot 
to everyone who contributed to this release.

Cheers,
Till
On Wed, Jul 22, 2020 at 11:38 AM Hequn Cheng  wrote:
Thanks Dian for the great work and thanks to everyone who makes this
 release possible!

 Best, Hequn

 On Wed, Jul 22, 2020 at 4:40 PM Jark Wu  wrote:

 > Congratulations! Thanks Dian for the great work and to be the release
 > manager!
 >
 > Best,
 > Jark
 >
 > On Wed, 22 Jul 2020 at 15:45, Yangze Guo  wrote:
 >
 > > Congrats!
 > >
 > > Thanks Dian Fu for being release manager, and everyone involved!
 > >
 > > Best,
 > > Yangze Guo
 > >
 > > On Wed, Jul 22, 2020 at 3:14 PM Wei Zhong 
 > wrote:
 > > >
 > > > Congratulations! Thanks Dian for the great work!
 > > >
 > > > Best,
 > > > Wei
 > > >
 > > > > 在 2020年7月22日,15:09,Leonard Xu  写道:
 > > > >
 > > > > Congratulations!
 > > > >
 > > > > Thanks Dian Fu for the great work as release manager, and thanks
 > > everyone involved!
 > > > >
 > > > > Best
 > > > > Leonard Xu
 > > > >
 > > > >> 在 2020年7月22日,14:52,Dian Fu  写道:
 > > > >>
 > > > >> The Apache Flink community is very happy to announce the release of
 > > Apache Flink 1.11.1, which is the first bugfix release for the Apache
 > Flink
 > > 1.11 series.
 > > > >>
 > > > >> Apache Flink(r) is an open-source stream processing framework for
 > > distributed, high-performing, always-available, and accurate data
 > streaming
 > > applications.
 > > > >>
 > > > >> The release is available for download at:
 > > > >> https://flink.apache.org/downloads.html
 > > > >>
 > > > >> Please check out the release blog post for an overview of the
 > > improvements for this bugfix release:
 > > > >> https://flink.apache.org/news/2020/07/21/release-1.11.1.html
 > > > >>
 > > > >> The full release notes are available in Jira:
 > > > >>
 > >
 > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348323
 > > > >>
 > > > >> We would like to thank all contributors of the Apache Flink
 > community
 > > who made this release possible!
 > > > >>
 > > > >> Regards,
 > > > >> Dian
 > > > >
 > > >
 > >
 >


-- 
Konstantin Knauf 
https://twitter.com/snntrable
https://github.com/knaufk 



Re: Will broadcast stream affect performance because of the absence of operator chaining?

2019-08-06 Thread zhijiang
Hi paul,

In theory broadcast operator could not be chained for all-to-all mode, and 
chain is only feasible for one-to-one mode like forward. 
If chain, the next operator could process the raw record emitted by head 
operator directly. But if not, the emitted record must be serialized into 
buffer which could be consumed by the dowstream op via network ornot. So the 
chain way has the best performance in theory compared to non-chain.

In your case, if you could not bypass the requirements of broadcast, then you 
have to face the non-chain way and test whether the real performance is within 
your acception or not. If the performance is not reaching your requirements, we 
could further consider other improvements.

Best,
Zhijiang
--
From:Piotr Nowojski 
Send Time:2019年8月6日(星期二) 14:55
To:黄兆鹏 
Cc:user 
Subject:Re: Will broadcast stream affect performance because of the absence of 
operator chaining?

Hi,

No, I think you are right, I forgot about the broadcasting requirement.

Piotrek

On 6 Aug 2019, at 13:11, 黄兆鹏  wrote:
Hi, Piotrek,
I previously considered your first advice(use union record type), but I found 
that the schema would be only sent to one subtask of the operator(for example, 
operatorA), and other subtasks of the operator are not aware of it. 
In this case is there anything I have missed? 

Thank you!





-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 06:57 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Have you measured the performance impact of braking the operator chain?

This is a current limitation of Flink chaining, that if an operator has two 
inputs, it can be chained to something else (only one input operators are 
chained together). There are plans for the future to address this issue.

As a workaround, besides what you have mentioned:
- maybe your record type can be a union: type of Record or Schema (not Record 
AND Schema), and upstream operators (operatorA) could just ignore/forward the 
Schema. You wouldn’t need to send schema with every record.
- another (ugly) solution, is to implement BroadcastStream input outside of 
Flink, but then you might have issues with checkpointing/watermarking and it 
just makes many things more complicated.

Piotrek

On 6 Aug 2019, at 10:50, 黄兆鹏  wrote:
Hi Piotrek,
Thanks for your reply, my broadcast stream just listen to the changes of the 
schema, and it's very infrequent and very lightweight.

In fact there are two ways to solve my problem,

the first one is a broadcast stream that listen to the change of the schema, 
and broadcast to every operator that will handle the data, just as I posted 
originally.
DataStream: OperatorA  ->  OperatorB  -> OperatorC
  ^   ^  ^
  |||
  BroadcastStream

the second approach is that I have an operator that will join my data and 
schema together and send to the downstream operators:
 DataStream: MergeSchemaOperator -> OperatorA  ->  OperatorB  -> OperatorC
  ^ 
  |   
BroadcastStream


The benefits of the first approach is that the flink job does not have to 
transfer the schema with the real data records among operators, because the 
schema will be broadcasted to each operator.
But the disadvantage of the first approache is that it breaks the operator 
chain, so operators may not be executed in the same slot and gain worse 
performance.

The second approach does not have the problem as the first one, but each 
message will carry its schema info among operators, it will cost about 2x for 
serialization and deserialization between operators.

Is there a better workaround that all the operators could notice the schema 
change and at the same time not breaking the operator chaining? 

Thanks!
-- Original --
From:  "Piotr Nowojski";
Date:  Tue, Aug 6, 2019 04:23 PM
To:  "黄兆鹏"; 
Cc:  "user"; 
Subject:  Re: Will broadcast stream affect performance because of the absence 
of operator chaining?
Hi,

Broadcasting will brake an operator chain. However my best guess is that Kafka 
source will be still a performance bottleneck in your job. Also Network 
exchanges add some measurable overhead only if your records are very 
lightweight and easy to process (for example if you are using RocksDB then you 
can just ignore network costs).

Either way, you can just try this out. Pre populate your Kafka topic with some 
significant number of messages, run both jobs, compare the throughput and 
decide based on those results wether this is ok for you or not.

P

Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread zhijiang
Congratulations Andrey, great work and well deserved!

Best,
Zhijiang
--
From:Till Rohrmann 
Send Time:2019年8月14日(星期三) 15:26
To:dev ; user 
Subject:[ANNOUNCE] Andrey Zagrebin becomes a Flink committer

Hi everyone,

I'm very happy to announce that Andrey Zagrebin accepted the offer of the Flink 
PMC to become a committer of the Flink project.

Andrey has been an active community member for more than 15 months. He has 
helped shaping numerous features such as State TTL, FRocksDB release, Shuffle 
service abstraction, FLIP-1, result partition management and various 
fixes/improvements. He's also frequently helping out on the user@f.a.o mailing 
lists.

Congratulations Andrey!

Best, Till 
(on behalf of the Flink PMC)



Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread zhijiang
Congratulations Zili!
--
From:Becket Qin 
Send Time:2019年9月12日(星期四) 03:43
To:Paul Lam 
Cc:Rong Rong ; dev ; user 

Subject:Re: [ANNOUNCE] Zili Chen becomes a Flink committer

Congrats, Zili!
On Thu, Sep 12, 2019 at 9:39 AM Paul Lam  wrote:
Congratulations Zili!

Best,
Paul Lam

在 2019年9月12日,09:34,Rong Rong  写道:
Congratulations Zili!

--
Rong
On Wed, Sep 11, 2019 at 6:26 PM Hequn Cheng  wrote:
Congratulations!

Best, Hequn
On Thu, Sep 12, 2019 at 9:24 AM Jark Wu  wrote:
Congratulations Zili!

 Best,
 Jark

 On Wed, 11 Sep 2019 at 23:06,  wrote:

 > Congratulations, Zili.
 >
 >
 >
 > Best,
 >
 > Xingcan
 >
 >
 >
 > *From:* SHI Xiaogang 
 > *Sent:* Wednesday, September 11, 2019 7:43 AM
 > *To:* Guowei Ma 
 > *Cc:* Fabian Hueske ; Biao Liu ;
 > Oytun Tez ; bupt_ljy ; dev <
 > d...@flink.apache.org>; user ; Till Rohrmann <
 > trohrm...@apache.org>
 > *Subject:* Re: [ANNOUNCE] Zili Chen becomes a Flink committer
 >
 >
 >
 > Congratulations!
 >
 >
 >
 > Regards,
 >
 > Xiaogang
 >
 >
 >
 > Guowei Ma  于2019年9月11日周三 下午7:07写道:
 >
 > Congratulations Zili !
 >
 >
 > Best,
 >
 > Guowei
 >
 >
 >
 >
 >
 > Fabian Hueske  于2019年9月11日周三 下午7:02写道:
 >
 > Congrats Zili Chen :-)
 >
 >
 >
 > Cheers, Fabian
 >
 >
 >
 > Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu :
 >
 > Congrats Zili!
 >
 >
 >
 > Thanks,
 >
 > Biao /'bɪ.aʊ/
 >
 >
 >
 >
 >
 >
 >
 > On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:
 >
 > Congratulations!
 >
 >
 >
 > ---
 >
 > Oytun Tez
 >
 >
 >
 > *M O T A W O R D*
 >
 > *The World's Fastest Human Translation Platform.*
 >
 > oy...@motaword.com — www.motaword.com
 >
 >
 >
 >
 >
 > On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:
 >
 > Congratulations!
 >
 >
 >
 > Best,
 >
 > Jiayi Liao
 >
 >
 >
 >  Original Message
 >
 > *Sender:* Till Rohrmann
 >
 > *Recipient:* dev; user
 >
 > *Date:* Wednesday, Sep 11, 2019 17:22
 >
 > *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
 >
 >
 >
 > Hi everyone,
 >
 >
 >
 > I'm very happy to announce that Zili Chen (some of you might also know
 > him as Tison Kun) accepted the offer of the Flink PMC to become a committer
 > of the Flink project.
 >
 >
 >
 > Zili Chen has been an active community member for almost 16 months now.
 > He helped pushing the Flip-6 effort over the finish line, ported a lot of
 > legacy code tests, removed a good part of the legacy code, contributed
 > numerous fixes, is involved in the Flink's client API refactoring, drives
 > the refactoring of Flink's HighAvailabilityServices and much more. Zili
 > Chen also helped the community by PR reviews, reporting Flink issues,
 > answering user mails and being very active on the dev mailing list.
 >
 >
 >
 > Congratulations Zili Chen!
 >
 >
 >
 > Best, Till
 >
 > (on behalf of the Flink PMC)
 >
 >




Re: How can I get the backpressure signals inside my function or operator?

2019-11-05 Thread Zhijiang
Hi Felipe,

That is an interesting idea to control the upstream's output based on 
downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush 
output while the reduce operator is idle/hungry. In contrast, the preAggregate 
would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get 
the back pressure signal from the operator level. AIK only the upper task level 
can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of 
`Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might 
rely on some external metric reporter to query it if possible.

Best,
Zhijiang


--
From:Felipe Gutierrez 
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user 
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> 
sink" job and the reducer is sending backpressure signals to the preAggregate, 
map and source operator. How do I get those signals inside my operator's 
implementation?
I guess inside the function is not possible. But if I have my own operator 
implemented (preAggregate) can I get those backpressure signals?

I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] 
on my preAggregate operator in order to decide when I stop the pre-aggregation 
and flush tuples or when I keep pre aggregating. It is something like the 
"credit based control on the network stack" [2].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
[2] https://www.youtube.com/watch?v=AbqatHF3tZI

Thanks!
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: What metrics can I see the root cause of "Buffer pool is destroyed" message?

2019-11-06 Thread Zhijiang
Hi Felipe,

"Buffer pool is destroyed" is mainly caused by canceling task. That means there 
are other tasks failure which would trigger canceling all the topology tasks by 
job master.
So if you want to find the root cause, it is proper to check the job master log 
to find the first failure which triggers the following cancel operations.

In addition, which flink version are you using?

Best,
Zhijiang


--
From:Felipe Gutierrez 
Send Time:2019 Nov. 6 (Wed.) 19:12
To:user 
Subject:What metrics can I see the root cause of "Buffer pool is destroyed" 
message?

Hi community,

Looking at the code [1] it seems that it is related to not have 
availableMemorySegments anymore. I am looking at several metrics but it hasn't 
seemed to help me understand where I can measure the root cause of this error 
message.

- flink_taskmanager_Status_Shuffle_Netty_AvailableMemorySegments does not seem 
to give me a related cause. 
- flink_taskmanager_job_task_Shuffle_Netty_Output_Buffers_inputQueueLength I 
see my reducer operator always with queue lenght equal 4. Pre-aggregate task 
sometimes goes to 3 but it goes only few times.
- flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outPoolUsage and 
flink_taskmanager_job_task_Shuffle_Netty_Input_Buffers_outputQueueLength shows 
my source task several times in 100%. But my error message comes from the 
pre-aggregate task.
- flink_taskmanager_job_task_Shuffle_Netty_Output_numBuffersInLocalPerSecond 
DOES show the the pre-aggregate task is consuming a lot. But with which metric 
can I relate this to know in advance how much is a lot?

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java#L265

Thanks for your suggestions and here is my stack trace:

java.lang.RuntimeException: Buffer pool is destroyed.
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 at 
org.apache.flink.streaming.examples.aggregate.WordCountPreAggregate$WordCountPreAggregateFunction.collect(WordCountPreAggregate.java:251)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamPreAggregateOperator.collect(AbstractUdfStreamPreAggregateOperator.java:84)
 at 
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.collect(PreAggregateTriggerFunction.java:49)
 at 
org.apache.flink.streaming.api.functions.aggregation.PreAggregateTriggerFunction.run(PreAggregateTriggerFunction.java:63)
 at java.util.TimerThread.mainLoop(Timer.java:555)
 at java.util.TimerThread.run(Timer.java:505)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentFromGlobal(LocalBufferPool.java:264)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:240)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:215)
 at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:182)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.getBufferBuilder(ChannelSelectorRecordWriter.java:95)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:131)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com



Re: How can I get the backpressure signals inside my function or operator?

2019-11-06 Thread Zhijiang
You can refer to this document [1] for the rest API details.
Actually the backpreesure uri refers to 
"/jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether it is 
easy to get the jobid and vertexid.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html

Best,
Zhijiang
--
From:Felipe Gutierrez 
Send Time:2019 Nov. 7 (Thu.) 00:06
To:Chesnay Schepler 
Cc:Zhijiang ; user 
Subject:Re: How can I get the backpressure signals inside my function or 
operator?

If I can trigger the sample via rest API it is good for a POC. Then I can read 
from any in-memory storage using a separated thread within the operator. But 
what is the rest api that gives to me the ratio value from backpressure?

Thanks
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez

-- https://felipeogutierrez.blogspot.com

On Wed, Nov 6, 2019 at 4:55 PM Chesnay Schepler  wrote:

I don't think there is a truly sane way to do this. 
I could envision a separate application triggering samples via the REST API, 
writing the results into kafka which your operator can read. This is probably 
the most reasonable solution I can come up with.
Any attempt at accessing the TaskExecutor or metrics from within the operator 
are inadvisable; you'd be encroaching into truly hacky territory.
You could also do your own backpressure sampling within your operator (separate 
thread within the operator executing the same sampling logic), but I don't know 
how easy it would be to re-use Flink code.

On 06/11/2019 13:40, Felipe Gutierrez wrote:
Does anyone know in which metric I can rely on to know if a given operator is 
activating the backpressure? 
Or how can I call the same java object that the Flink UI calls to give me the 
ratio of backpressure?

Thanks,
Felipe

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 5, 2019 at 4:15 PM Felipe Gutierrez  
wrote:
Hi Zhijiang, 

thanks for your reply. Yes, you understood correctly.
The fact that I cannot get "Shuffle.Netty.Input.Buffers.inputQueueLength" on 
the operator might be because of the way Flink runtime architecture was 
designed. But I was wondering what kind of signal I can get. I guess some 
backpressure message I could get because backpressure works to slow down the 
upstream operators. 

For example, I can see the ratio per sub-task on the web interface [1]. It 
means the physical operators. Is there any message flowing backward that I can 
get? Is there anything that makes me able to not rely on some external storage?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#sampling-threads
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com    

On Tue, Nov 5, 2019 at 12:23 PM Zhijiang  wrote:
Hi Felipe,

That is an interesting idea to control the upstream's output based on 
downstream's input.

 If I understood correctly, the preAggregate operator would trigger flush 
output while the reduce operator is idle/hungry. In contrast, the preAggregate 
would continue aggregating data in the case of back pressure.

I think this requirement is valid, but unfortunately I guess you can not get 
the back pressure signal from the operator level. AIK only the upper task level 
can get the input/output state to decide whether to process or not.

If you want to get the reduce's metric of 
`Shuffle.Netty.Input.Buffers.inputQueueLength` on preAggregate side, you might 
rely on some external metric reporter to query it if possible.

Best,
Zhijiang 

--
From:Felipe Gutierrez 
Send Time:2019 Nov. 5 (Tue.) 16:58
To:user 
Subject:How can I get the backpressure signals inside my function or operator?

Hi all,

 let's say that I have a "source -> map .> preAggregrate -> keyBy -> reduce -> 
sink" job and the reducer is sending backpressure signals to the preAggregate, 
map and source operator. How do I get those signals inside my operator's 
implementation?
 I guess inside the function is not possible. But if I have my own operator 
implemented (preAggregate) can I get those backpressure signals?

 I want to get the messages "Shuffle.Netty.Input.Buffers.inputQueueLength" [1] 
on my preAggregate operator in order to decide when I stop the pre-aggregation 
and flush tuples or when I keep pre aggregating. It is something like the 
"credit based control on the network stack" [2].

 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#default-shuffle-service
 [2] https://www.youtube.com/watch?v=AbqatHF3tZI

 Thanks!
 Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com  





Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-20 Thread Zhijiang
Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked 
state backend. Then I guess it is either heap memory or direct memory over 
used. The heap managed memory is mainly used by batch operators and direct 
memory is used by network shuffle. Can you further check whether there are any 
logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the 
used memory exceeds the JVM configuration, it should throw that error. Then we 
can further narrow down the scope. I can not remember the changes of memory 
issues for managed memory or network stack, especially it really spans several 
releases.

Best,
Zhijiang


--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 01:03
To:user@flink.apache.org 
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the 
taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
success. It still leads to the container running beyond physical memory limits.

// ah

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: 'user@flink.apache.org' 
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi,

We’re in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
at 
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

I drilled further down into the YARN app logs, and I found that the container 
was running out of physical memory:

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e42_1574076744505_9444_01_04 because: Container 
[pid=42774,containerID=container_e42_1574076744505_9444_01_04] is running 
beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory 
used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

This is what leads my suspicions as this resourcing configuration worked just 
fine on 1.6.4

I’m working on getting heap dumps of these applications to try and get a better 
understanding of what’s causing the blowup in physical memory required myself, 
but it would be helpful if anyone knew what relevant changes have been made 
between these versions or where else I could look? There are some features in 
1.9 that we’d like to use in our flows so getting this sorted out, no pun 
intended, is inhibiting us from doing so.

Best,
Andreas

 Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to:  
www.gs.com/privacy-notices



Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-11-21 Thread Zhijiang
The hint of mmap usage below is really helpful to locate this problem. I forgot 
this biggest change for batch job in release-1.9.
The blocking type option can be set to `file` as Piotr suggested to behave 
similar as before. I think it can solve your problem. 


--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 23:37
To:Piotr Nowojski 
Cc:Zhijiang ; user@flink.apache.org 

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Thanks, Piotr. We’ll rerun our apps today with this and get back to you. 

// ah

From: Piotr Nowojski  On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] 
Cc: Zhijiang ; user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi,
I would suspect this:
https://issues.apache.org/jira/browse/FLINK-12070
To be the source of the problems.
There seems to be a hidden configuration option that avoids using memory mapped 
files:
taskmanager.network.bounded-blocking-subpartition-type: file
Could you test if helps?
Piotrek



On 21 Nov 2019, at 15:22, Hailu, Andreas  wrote:
Hi Zhijiang,

I looked into the container logs for the failure, and didn’t see any specific 
OutOfMemory errors before it was killed. I ran the application using the same 
config this morning on 1.6.4, and it went through successfully. I took a 
snapshot of the memory usage from the dashboard and can send it to you if you 
like for reference.

What stands out to me as suspicious is that on 1.9.1, the application is using 
nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its 
runtime and succeeds. The JVM heap memory itself never exceeds its capacity, 
peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes 
around mapped memory.

// ah

From: Zhijiang  
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked 
state backend. Then I guess it is either heap memory or direct memory over 
used. The heap managed memory is mainly used by batch operators and direct 
memory is used by network shuffle. Can you further check whether there are any 
logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the 
used memory exceeds the JVM configuration, it should throw that error. Then we 
can further narrow down the scope. I can not remember the changes of memory 
issues for managed memory or network stack, especially it really spans several 
releases.

Best,
Zhijiang

--
From:Hailu, Andreas 
Send Time:2019 Nov. 21 (Thu.) 01:03
To:user@flink.apache.org 
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the 
taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no 
success. It still leads to the container running beyond physical memory limits.

// ah

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: 'user@flink.apache.org' 
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hi,

We’re in the middle of testing the upgrade of our data processing flows from 
Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 
1.6.4 now fail on 1.9.1 with the same application resources and input data 
size. It seems that there have been some changes around how the data is sorted 
prior to being fed to the CoGroup operator - this is the error that we 
encounter:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset 
| Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that 
the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: Lost connection to 
task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'

Re: Flink task node shut it self off.

2019-12-22 Thread Zhijiang
Hi John,

Thanks for the positive comments of Flink usage. No matter at least-once or 
exactly-once you used for checkpoint, it would never lose one message during 
failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the 
longer internal checkpoint would mean replaying more source data after failure 
recovery.
In my experience the 5 seconds interval for checkpoint is too frequently in my 
experience, and you might increase it to 1 minute or so. You can also monitor 
how long will the checkpoint finish in your application, then you can adjust 
the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure whether it 
is relevant to your short checkpoint interval. Do you config to use heap state 
backend?  The hs_err file really indicated that you job had encountered the 
memory issue, then it is better to somehow increase your task manager memory. 
But if you can analyze the dump hs_err file via some profiler tool for checking 
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang 


--
From:John Smith 
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user 
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and 
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We 
have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, 
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor 
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory 
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory 
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 
1minute of 15. And we also got an hs_err file which sais we should increase the 
memory.

I'm attaching the logs here: 
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?







Re: Flink task node shut it self off.

2019-12-24 Thread Zhijiang
If you use rocksDB state backend, it might consume extra native memory. 
Some resource framework cluster like yarn would kill the container if the 
memory usage exceeds some threshold. You can also double check whether it 
exists in your case.


--
From:John Smith 
Send Time:2019 Dec. 25 (Wed.) 03:40
To:Zhijiang 
Cc:user 
Subject:Re: Flink task node shut it self off.

The shutdown happened after the massive IO wait. I don't use any state 
Checkpoints are disk based...
On Mon., Dec. 23, 2019, 1:42 a.m. Zhijiang,  wrote:

Hi John,

Thanks for the positive comments of Flink usage. No matter at least-once or 
exactly-once you used for checkpoint, it would never lose one message during 
failure recovery.

Unfortunatelly I can not visit the logs you posted. Generally speaking the 
longer internal checkpoint would mean replaying more source data after failure 
recovery.
In my experience the 5 seconds interval for checkpoint is too frequently in my 
experience, and you might increase it to 1 minute or so. You can also monitor 
how long will the checkpoint finish in your application, then you can adjust 
the interval accordingly.

Concerning of the node shutdown you mentioned, I am not quite sure whether it 
is relevant to your short checkpoint interval. Do you config to use heap state 
backend?  The hs_err file really indicated that you job had encountered the 
memory issue, then it is better to somehow increase your task manager memory. 
But if you can analyze the dump hs_err file via some profiler tool for checking 
the memory usage, it might be more helpful to find the root cause.

Best,
Zhijiang 

--
From:John Smith 
Send Time:2019 Dec. 21 (Sat.) 05:26
To:user 
Subject:Flink task node shut it self off.

Hi, using Flink 1.8.0

1st off I must say Flink resiliency is very impressive, we lost a node and 
never lost one message by using checkpoints and Kafka. Thanks!

The cluster is a self hosted cluster and we use our own zookeeper cluster. We 
have...
3 zookeepers: 4 cpu, 8GB (each)
3 job nodes: 4 cpu, 8GB (each)
3 task nodes: 4 cpu, 8GB (each)
The nodes also share GlusterFS for storing savepoints and checkpoints, 
GlusterFS is running on the same machines.

Yesterday a node shut itself off we the following log messages...
- Stopping TaskExecutor 
akka.tcp://fl...@xxx.xxx.xxx.73:34697/user/taskmanager_0.
- Stop job leader service.
- Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
- Shutting down TaskExecutorLocalStateStoresManager.
- Shutting down BLOB cache
- Shutting down BLOB cache
- removed file cache directory 
/tmp/flink-dist-cache-4b60d79b-1cef-4ffb-8837-3a9c9a205000
- I/O manager removed spill file directory 
/tmp/flink-io-c9d01b92-2809-4a55-8ab3-6920487da0ed
- Shutting down the network environment and its components.

Prior to the node shutting off we noticed massive IOWAIT of 140% and CPU load 
1minute of 15. And we also got an hs_err file which sais we should increase the 
memory.

I'm attaching the logs here: 
https://www.dropbox.com/sh/vp1ytpguimiayw7/AADviCPED47QEy_4rHsGI1Nya?dl=0

I wonder if my 5 second checkpointing is too much for gluster.

Any thoughts?








Re: Rewind offset to a previous position and ensure certainty.

2019-12-25 Thread Zhijiang
If I understood correctly, different partitions of Kafka would be emitted by 
different source tasks with different watermark progress.  And the Flink 
framework would align the different watermarks to only output the smallest 
watermark among them, so the events from slow partitions would not be discarded 
because the downstream operator would only see the watermark based on the slow 
partition atm. You can refer to [1] for some details.

As for rewinding the offset of partition position, I guess it only happens in 
failure recovery case or you manually restart the job. Anyway all the topology 
tasks would be restarted and previous received watermarks are cleared.
So it would also not discard the events in this case.  Unless you can only 
rewind some source task to previous positions and keep other downstream tasks 
still running, it might have the issues you concern. But Flink can not support 
such operation/function atm. :) 

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html

Best,
Zhijiang
--
From:邢瑞斌 
Send Time:2019 Dec. 25 (Wed.) 20:27
To:user-zh ; user 
Subject:Rewind offset to a previous position and ensure certainty.

Hi,

I'm trying to use Kafka as an event store and I want to create several 
partitions to improve read/write throughput. Occasionally I need to rewind 
offset to a previous position for recomputing. Since order isn't guaranteed 
among partitions in Kafka, does this mean that Flink won't produce the same 
results as before when rewind even if it uses event time? For example, consumer 
for a partition progresses extremely fast and raises watermark, so events from 
other partitions are discarded. Is there any ways to prevent this from 
happening?

Thanks in advance!

Ruibin



Re: Aggregating Movie Rental information in a DynamoDB table using DynamoDB streams and Flink

2019-12-25 Thread Zhijiang
Hi Joe,

Your requirement is the effective exactly-once for external sink. I think your 
option 4 with TwoPhaseCommitSinkFunction is the right way to go.
Unfortunately I am not quite familiar with this part, so can not give you 
specific suggestions for using it, especially for your concern of storing 
checkpoint id.
After the holiday some guys with rich experienced with it can provide you more 
professional ideas I guess. :)

ATM you can refer to the simple implementation 
TwoPhaseCommitSinkFunctionTest#ContentDumpSinkFunction and complex one 
FlinkKafkaProducer for more insights.
In addition, the StreamingFileSink also implements the exactly-once for sink. 
You might also refer to it to get some insights if possible.

Best,
Zhijiang




--
From:Joe Hansen 
Send Time:2019 Dec. 26 (Thu.) 01:42
To:user 
Subject:Aggregating Movie Rental information in a DynamoDB table using DynamoDB 
streams and Flink

Happy Holidays everyone!

tl;dr: I need to aggregate movie rental information that is being
stored in one DynamoDB table and store running total of the
aggregation in another table. How do I ensure exactly-once
aggregation.

I currently store movie rental information in a DynamoDB table named
MovieRentals: {movie_title, rental_period_in_days, order_date,
rent_amount}

We have millions of movie rentals happening on any given day.  Our web
application needs to display the aggregated rental amount for any
given movie title.

I am planning to use Flink to aggregate rental amounts by movie_title
on the MovieRental DynamoDB stream and store the aggregated rental
amounts in another DynamoDB table named RentalAmountsByMovie:
{movie_title, total_rental_amount}

How do I ensure that RentalAmountsByMovie amounts are accurate. i.e.
How do I prevent results from any checkpoint from not updating the
RentalAmountsByMovie table records more than once?

1) Do I need to store checkpoint ids in the RentalAmountsByMovie table
and do conditional updates to handle the scenario described above?
2) I can possibly implement TwoPhaseCommitSinkFunction that talks to
DynamoDB. However, according to Flink documentation the commit
function can be called more than once and hence needs to be
idempotent. So even this solution requires checkpoint-ids to be stored
on the target store.
3) Another pattern seems to be storing the time-window aggregation
results in the RentalAmountsByMovie table. And the webapp will have to
compute the running total on the fly. I don't like this solution for
its latency implications to the webapp.
4) May be I can use Flink's Queryable state feature. However, that
feature seems to be in Beta:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

I imagine this is a very common aggregation use case. How do folks
usually handle **updating aggregated results in Flink external
sinks**?

I appreciate any pointers. Happy to provide more details if needed.

Thanks!



Re: How to verify if checkpoints are asynchronous or sync

2020-01-07 Thread Zhijiang
The log way is simple for tracing and you can also grep some keywords to find 
your requirement messages to avoid skimming through the whole large logs.
I am not quite sure what's your specific motivation for doing this. Besides the 
log way, you can also monitor the thread stack for confirming whether it is 
happening, but maybe it is not very convenient.
Another possible way is via the checkpoint metrics which would record the 
sync/async duration time, maybe it can also satisfy your requirements.

Best,
Zhijiang 


--
From:RKandoji 
Send Time:2020 Jan. 8 (Wed.) 10:23
To:William C 
Cc:user 
Subject:Re: How to verify if checkpoints are asynchronous or sync

Thanks for the reply.
I will check and enable debug logs specifically for the class that contains 
this log.
But in general logs are already too huge and I'm trying to suppress some of 
them, so wondering if there is any other way?

Thanks,
RKandoji


On Tue, Jan 7, 2020 at 7:50 PM William C  wrote:
Can you enable debug log to check with that?

 regards.

 on 2020/1/8 6:36, RKandoji wrote:
 > But I'm curious if there is way to verify if the checkpoints are 
 > happening asynchronously or synchronously.
 > 



Re: How can I find out which key group belongs to which subtask

2020-01-10 Thread Zhijiang
Only chained operators can avoid record serialization cost, but the chaining 
mode can not support keyed stream.
If you want to deploy downstream with upstream in the same task manager, it can 
avoid network shuffle cost which can still get performance benefits.
As I know @Till Rohrmann has implemented some enhancements in scheduler layer 
to support such requirement in release-1.10. You can have a try when the rc 
candidate is ready.

Best,
Zhijiang


--
From:杨东晓 
Send Time:2020 Jan. 10 (Fri.) 02:10
To:Congxian Qiu 
Cc:user 
Subject:Re: How can I find out which key group belongs to which subtask

Thanks Congxian!
 My purpose is not only make data goes into one same subtask but the specific 
subtask which belongs to same taskmanager with upstream record. The key idea is 
to avoid shuffling  between taskmanagers.
I think the KeyGroupRangeAssignment.java  explained a lot about how to get 
keygroup and subtask context that can make that happen.
Do you know if there are still  serialization happening while data transferred 
between operator in same taskmanager?
Thanks.
Congxian Qiu  于2020年1月9日周四 上午1:55写道:

Hi

If you just want to make sure some key goes into the same subtask, does custom 
key selector[1] help?

For the keygroup and subtask information, you can ref to 
KeyGroupRangeAssignment[2] for more info, and the max parallelism logic you can 
ref to doc[3]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#setting-the-maximum-parallelism

Best,
Congxian

杨东晓  于2020年1月9日周四 上午7:47写道:
Hi , I'm trying to do some optimize about Flink 'keyby' processfunction. Is 
there any possible I can find out one key belongs to which key-group and 
essentially find out one key-group belongs to which subtask.
The motivation I want to know that is we want to  force the data records from 
upstream still goes to same taskmanager downstream subtask .Which means even if 
we use a keyedstream function we still want no cross jvm communication happened 
during run time.
And if we can achieve that , can we also avoid the expensive cost for record 
serialization because data is only transferred in same taskmanager jvm instance?

Thanks.



Re: Exactly once semantics for hdfs sink

2020-02-11 Thread Zhijiang
Hi Vishwas,

I guess this link [1] can help understand how it works and how to use in 
practice for StreamingFileSink.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

Best,
Zhijiang


--
From:Vishwas Siravara 
Send Time:2020 Feb. 12 (Wed.) 05:19
To:Khachatryan Roman 
Cc:user 
Subject:Re: Exactly once semantics for hdfs sink

Hi Khachatryan,
Thanks for your reply. Can you help me understand how it works with hdfs 
specifically , even a link to a document will help. 


Best,
Vishwas 
On Mon, Feb 10, 2020 at 10:32 AM Khachatryan Roman 
 wrote:

Hi Vishwas,

Yes, Streaming File Sink does support exactly-once semantics and can be used 
with HDFS.

Regards,
Roman

On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara  wrote:
Hi all,
I want to use the StreamingFile sink for writing data to hdfs. Can I achieve 
exactly once semantics with this sink ? 


Best,
HW. 



Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Zhijiang
Really great work and thanks everyone involved, especially for the release 
managers!

Best,
Zhijiang
--
From:Kurt Young 
Send Time:2020 Feb. 13 (Thu.) 11:06
To:[None]
Cc:user ; dev 
Subject:Re: [ANNOUNCE] Apache Flink 1.10.0 released

Congratulations to everyone involved! 
Great thanks to Yu & Gary for being the release manager!

Best,
Kurt


On Thu, Feb 13, 2020 at 10:06 AM Hequn Cheng  wrote:

Great thanks to Yu & Gary for being the release manager! 
Also thanks to everyone who made this release possible!

Best, Hequn
On Thu, Feb 13, 2020 at 9:54 AM Rong Rong  wrote:
Congratulations, a big thanks to the release managers for all the hard works!!

--
Rong
On Wed, Feb 12, 2020 at 5:52 PM Yang Wang  wrote:
Excellent work. Thanks Gary & Yu for being the release manager.


Best,
Yang
Jeff Zhang  于2020年2月13日周四 上午9:36写道:
Congratulations! Really appreciated your hard work.

Yangze Guo  于2020年2月13日周四 上午9:29写道:
Thanks, Gary & Yu. Congrats to everyone involved!

 Best,
 Yangze Guo

 On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li  wrote:
 >
 > Congratulations! Great work.
 >
 > Best,
 > Jingsong Lee
 >
 > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu  wrote:
 >>
 >> Great news!
 >> Thanks everyone involved !
 >> Thanks Gary and Yu for being the release manager !
 >>
 >> Best,
 >> Leonard Xu
 >>
 >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
 >>
 >> Congrats to us all.
 >>
 >> A big piece of work, nicely done.
 >>
 >> Let's hope that this helps our users make their existing use cases easier 
 >> and also opens up new use cases.
 >>
 >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  wrote:
 >>>
 >>> Greet work.
 >>>
 >>> Congxian Qiu  于2020年2月12日周三 下午10:11写道:
 >>>>
 >>>> Great work.
 >>>> Thanks everyone involved.
 >>>> Thanks Gary and Yu for being the release manager
 >>>>
 >>>>
 >>>> Best,
 >>>> Congxian
 >>>>
 >>>>
 >>>> Jark Wu  于2020年2月12日周三 下午9:46写道:
 >>>>>
 >>>>> Congratulations to everyone involved!
 >>>>> Great thanks to Yu & Gary for being the release manager!
 >>>>>
 >>>>> Best,
 >>>>> Jark
 >>>>>
 >>>>> On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  wrote:
 >>>>>>
 >>>>>> Cheers!
 >>>>>> Thanks Gary and Yu for the great job as release managers.
 >>>>>> And thanks to everyone whose contribution makes the release possible!
 >>>>>>
 >>>>>> Thanks,
 >>>>>> Zhu Zhu
 >>>>>>
 >>>>>> Wyatt Chun  于2020年2月12日周三 下午9:36写道:
 >>>>>>>
 >>>>>>> Sounds great. Congrats & Thanks!
 >>>>>>>
 >>>>>>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  wrote:
 >>>>>>>>
 >>>>>>>> The Apache Flink community is very happy to announce the release of 
 >>>>>>>> Apache Flink 1.10.0, which is the latest major release.
 >>>>>>>>
 >>>>>>>> Apache Flink(r) is an open-source stream processing framework for 
 >>>>>>>> distributed, high-performing, always-available, and accurate data 
 >>>>>>>> streaming applications.
 >>>>>>>>
 >>>>>>>> The release is available for download at:
 >>>>>>>> https://flink.apache.org/downloads.html
 >>>>>>>>
 >>>>>>>> Please check out the release blog post for an overview of the 
 >>>>>>>> improvements for this new major release:
 >>>>>>>> https://flink.apache.org/news/2020/02/11/release-1.10.0.html
 >>>>>>>>
 >>>>>>>> The full release notes are available in Jira:
 >>>>>>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345845
 >>>>>>>>
 >>>>>>>> We would like to thank all contributors of the Apache Flink community 
 >>>>>>>> who made this release possible!
 >>>>>>>>
 >>>>>>>> Cheers,
 >>>>>>>> Gary & Yu
 >>
 >>
 >
 >
 > --
 > Best, Jingsong Lee


-- 
Best Regards

Jeff Zhang



Re: Encountered error while consuming partitions

2020-02-13 Thread Zhijiang
Thanks for reporting this issue and I also agree with the below analysis. 
Actually we encountered the same issue several years ago and solved it also via 
the netty idle handler.

Let's trace it via the ticket [1] as the following step.

[1] https://issues.apache.org/jira/browse/FLINK-16030

Best,
Zhijiang


--
From:张光辉 
Send Time:2020 Feb. 12 (Wed.) 22:19
To:Benchao Li 
Cc:刘建刚 ; user 
Subject:Re: Encountered error while consuming partitions

Network can fail in many ways, sometimes pretty subtle (e.g. high ratio packet 
loss). 

The problem is that the long tcp connection between netty client and server is 
lost, then the server failed to send message to the client, and shut down the 
channel. The Netty Client  does not know that the connection has been 
disconnected, so it has been waiting. 

To detect long tcp connection alive on netty client and server, we should have 
two ways: tcp keepalives and heartbeat.
Tcp keepalives is 2 hours by default. When the error occurs, if you continue to 
wait for 2 hours, the netty client will trigger exception and enter failover 
recovery.
If you want to detect long tcp connection quickly, netty provides 
IdleStateHandler which it use ping-pang mechanism. If netty client send 
continuously n ping message and receive no one pang message, then trigger 
exception.





Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks



Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
Let's move the further discussion onto the jira page. 
I have not much time recently for working on this. If you want to take it, I 
can assign it to you and help review the PR if have time then. Or I can find 
other possible guys work on it future.

Best,
Zhijiang


--
From:Jiayi Liao 
Send Time:2020 Feb. 14 (Fri.) 12:39
To:Zhijiang 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id


Hi Zhijiang,

It did confuses us when we’re tring to locate the unfinished subtask in 
Checkpoint UI last time. I’ve created an issue[1] for this. 
@杨东晓 Do you have time to work on this?

[1]. https://issues.apache.org/jira/browse/FLINK-16051

Best Regards,
Jiayi Liao



At 2020-02-14 10:14:27, "Zhijiang"  wrote:
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks






Re: Re: The mapping relationship between Checkpoint subtask id and Task subtask id

2020-02-13 Thread Zhijiang
BTW, the FLIP-75 is going for the user experience of web UI.
@Yadong Xiehave we already considered this issue to unify the ids in different 
parts in FLIP-75? 

Best,
Zhijiang
--
From:Zhijiang 
Send Time:2020 Feb. 14 (Fri.) 13:03
To:Jiayi Liao 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re: Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

Let's move the further discussion onto the jira page. 
I have not much time recently for working on this. If you want to take it, I 
can assign it to you and help review the PR if have time then. Or I can find 
other possible guys work on it future.

Best,
Zhijiang


--
From:Jiayi Liao 
Send Time:2020 Feb. 14 (Fri.) 12:39
To:Zhijiang 
Cc:Yun Tang ; 杨东晓 ; user 

Subject:Re:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id


Hi Zhijiang,

It did confuses us when we’re tring to locate the unfinished subtask in 
Checkpoint UI last time. I’ve created an issue[1] for this. 
@杨东晓 Do you have time to work on this?

[1]. https://issues.apache.org/jira/browse/FLINK-16051

Best Regards,
Jiayi Liao



At 2020-02-14 10:14:27, "Zhijiang"  wrote:
If the id is not consistent in different parts, maybe it is worth creating a 
jira ticket for better improving the user experience.
If anyone wants to work on it, please ping me then I can give a hand.

Best,
Zhijiang
--
From:Yun Tang 
Send Time:2020 Feb. 14 (Fri.) 10:52
To:杨东晓 ; user 
Subject:Re: The mapping relationship between Checkpoint subtask id and Task 
subtask id

 Hi

 Yes, you are right. Just simply use checkpoint subtask_id -1 would find the 
corresponding task subtask_id.

 Best
 Yun Tang

From: 杨东晓 
Sent: Friday, February 14, 2020 10:11
To: user 
Subject: The mapping relationship between Checkpoint subtask id and Task 
subtask id
Hi, I'm trying to figure out the different end2end duration for each subtask id 
in checkpoint. 
In flink web ui I noticed  for job task subtask id it start from 0 and for 
checkpoint  subtask id it start from number 1,.
How can I find out which checkpoint subtask id belongs to which job task 
subtask id, just simply use checkpoint subtask ID -1 will be ok?
Thanks







Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread Zhijiang
Congrats Jingsong! Welcome on board!

Best,
Zhijiang


--
From:Zhenghua Gao 
Send Time:2020 Feb. 21 (Fri.) 12:49
To:godfrey he 
Cc:dev ; user 
Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

Congrats Jingsong!


Best Regards,
Zhenghua Gao

On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:

Congrats Jingsong! Well deserved.

Best,
godfrey
Jeff Zhang  于2020年2月21日周五 上午11:49写道:
Congratulations!Jingsong. You deserve it 

wenlong.lwl  于2020年2月21日周五 上午11:43写道:
Congrats Jingsong!

 On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:

 > Congrats Jingsong!
 >
 > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
 > >
 > > Congratulations Jingsong! Well deserved.
 > >
 > > Best,
 > > Jark
 > >
 > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
 > >
 > >> Congratulations! Jingsong
 > >>
 > >>
 > >> Best,
 > >> Dan Zou
 > >>
 >
 >


-- 
Best Regards

Jeff Zhang



Re: How JobManager and TaskManager find each other?

2020-02-26 Thread Zhijiang
I guess you are indicating the data shuffle process among different task 
managers.

While task manager(TM) registering itself to the job manager(JM), it also 
carries the infos of ip address and data port that it listens to.
During the process of scheduling tasks, the upstream TM's address info(ip, 
port) would be covered inside the data structure of task
 deployment descriptor for respective downstream tasks. Then the downstream 
tasks can connect to the remote upstream TM
to request data.

In short words, JM knows all the addresses of TMs via registration, then these 
addresses would be sent to the required peers during task schedule and 
deployment.

Best,
Zhijiang


--
From:KristoffSC 
Send Time:2020 Feb. 26 (Wed.) 19:39
To:user 
Subject:Re: How JobManager and TaskManager find each other?

Thanks all for the answers,

One more question though. In [1] we can see that task managers are talking
with each other - sending data streams. How each task manager knows the
address of other task managers?


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#job-managers-task-managers-clients



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Question: Determining Total Recovery Time

2020-02-26 Thread Zhijiang
Hi Morgan,

Your idea is great and i am also interested in it.
 I think it is valuable for some users to estimate the maximum throughput 
capacity based on certain metrics or models.
But I am not quite sure whether it is feasible to do that based on existing 
metrics, at-least exist some limitations as Arvid mentioned.

If I understand correctly, the maximum throughput you want to measure is based 
on source emitting,
 that means some data are still buffered in mid topology and not processed yet. 
If so, we might refer to the metrics of `inputQueueLength`
and `inPoolUsage` together. Note if the `inPoolUsage` reaches 100%, it does not 
mean all the buffers are already filled with data, and just mean
all the available buffers are requested away. So `inputQueueLength` would be 
more precise to predict the available condition if we are aware of the
total buffer amount. In general we can make use of these two together.

 We can find the largest value of above metrics from all the topology tasks, 
which probably hint the bottleneck in the whole view. Then we can estimate
how many available buffers are left to hold more source emitting throughput. 
But there is a limitation if all the metrics are `0` in light-weight situation,
which i mentioned above. So we can not estimate the saturation unless we 
increase the source emit.

Wish good news sharing from you!

Best,
Zhijiang


--
From:Arvid Heise 
Send Time:2020 Feb. 26 (Wed.) 22:29
To:Morgan Geldenhuys 
Cc:Timo Walther ; user 
Subject:Re: Question: Determining Total Recovery Time

Hi Morgan,

doing it in a very general way sure is challenging.

I'd assume that your idea of using the buffer usage has some shortcomings 
(which I don't know), but I also think it's a good starting point.

Have you checked the PoolUsage metrics? [1] You could use them to detect the 
bottleneck and then estimate the max capacity of the whole job.

Btw, I'd be interested in results. We have the idea of adjustable buffer sizes 
and the insights would help us.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#default-shuffle-service
On Tue, Feb 25, 2020 at 6:36 PM Morgan Geldenhuys 
 wrote:

 Hi Arvid, Timo,

 Really appreciate the feedback. I have one final question on this topic and 
hope you dont mind me posing it to you directly. I posted the question earlier 
to the mailing list, but am looking at this more from an academic perspective 
as apposed to manually optimizing a specific job for a specific production 
environment. I do not know the flink internals well enough to determine if I 
can accomplish what I am looking for.

 For an experiment, I need to work out the Total Recovery Time (TRT). I define 
this as the time it takes the system to "catch up" to the current timestamp 
assuming event time processing after a node failure.

 I would like to follow a heuristic approach which is: 

job+environment agnostic, 
does not involve load testing, 
does not involve modifying the job or flink codebase, and 
relies solely on the metrics supplied.  As far as I know (and correct me if im 
wrong): TRT = heartbeat.timeout + recoveryTime+ time to reprocess 
uncheckpointed messages + lag to catch up to current timestamp.

 In order to predict TRT, I need some kind of resource utilization model based 
on the current processing capacity and maximum processing limit, let me explain:

Backpressure is essentially the point at which utilization has reached 100% for 
any particular streaming pipeline and means that the application has reached 
the max limit of messages that it can process per second. 
Lets consider an example: The system is running along perfectly fine under 
normal conditions, accessing external sources, and processing at an average of 
100,000 messages/sec. Lets assume the maximum capacity is around 130,000 
message/sec before back pressure starts propagating messages back up the 
stream. Therefore, utilization is at 0.76 (100K/130K). Great, but at present we 
dont know that 130,000 is the limit without load testing. 
For this example, is there a way of finding this maximum capacity (and hence 
the utilization) without pushing the system to its limit based solely on the 
average current throughput? Possibly by measuring the saturation of certain 
buffers between the operators? 
 If this is possible, the unused utilization can be used to predict how fast a 
system would get back to the current timestamp. Again, its a heuristic so it 
doesn't have to be extremely precise. Any hints would be greatly appreciated.

 Thank you very much!

 Regards,
 Morgan.

On 21.02.20 14:44, Arvid Heise wrote:
Hi Morgan,

sorry for the late reply. In general, that should work. You need to ensure that 
the same task is processing the same record though.

Local copy needs to be state or else the last message would be lost upon 
restart. Performance will take a hit

回复:checkpoint/taskmanager is stuck, deadlock on LocalBufferPool

2018-10-23 Thread zhijiang
From the stack below, it indicates there are no available buffers for source 
outputs including watermark and normal records, so the source will be blocked 
on request buffer from LocalBufferPool.
The checkpoint process is also affected by above blocking request. The root 
cause is why the queued output buffers are not consumed by downstream tasks.
 I think you can check the downstream task which inqueue usage should reach 
100%, then jstack the corresponding downstream tasks that may stuck in some 
operations to cause back pressure.

Best,
Zhijiang
--
发件人:Yan Zhou [FDS Science] 
发送时间:2018年10月23日(星期二) 02:29
收件人:user@flink.apache.org 
主 题:Re: checkpoint/taskmanager is stuck, deadlock on LocalBufferPool


 I am using flink 1.5.3 
From: Yan Zhou [FDS Science] 
Sent: Monday, October 22, 2018 11:26
To: user@flink.apache.org
Subject: checkpoint/taskmanager is stuck, deadlock on LocalBufferPool
 Hi,
 My application suddenly stuck and completely doesn't move forward after 
running for a few days. No exceptions are found. From the thread dump, I can 
see that the operator threads and checkpoint threads deadlock on 
LocalBufferPool.  LocalBufferPool is not able to request memory and keep the 
lock. Please see the thread dump at the bottom. 

It uses rocksdb as statebackend. From the heap dump and web ui, there are 
plenty of memory in jvm and it doesn't have GC problem. Check points were good 
until there was the problem:

2018-10-19 04:41:23,691 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 4347 @ 1539891683667 for job 1.
2018-10-19 04:41:45,069 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 4347 for job 1 (1019729450 bytes in 13600 ms).
2018-10-19 04:46:45,089 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 4348 @ 1539892005069 for job 1.
2018-10-19 04:56:45,089 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 4348 
of job 1 expired before completing.


This happened at mid night and the traffic was relatively low. Even if there 
was a spike and caused a back pressure, to my understand that the events should 
be processed  eventually and the network buffer would be available after that. 
What might be the cause of it? 


 Best
 Yan

"Time Trigger for Source: Custom Source -> Flat Map -> Flat Map -> 
Timestamps/Watermarks -> (from: ...s#363 daemon prio=5 os_prio=0 
tid=0x7ff187944000 nid=0x8f76 in Object.wait() [0x7ff12fda9000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
- locked <0x0006dadeeac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:117)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:87)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:736)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:736)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processWatermark(ProcessOperator.java:72)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:479)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:603)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:668)
at 
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:77)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcess

回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we converted the pig pipeline to apache 
beam and ran it using flink on a production yarn cluster, we got the following 
error 

2018-11-21 16:52:25,307 ERROR org.apache.flink.runtime.operators.BatchTask  
- Error in task code:  GroupReduce (GroupReduce at CoGBK/GBK) 
(25/100)
java.lang.RuntimeException: Emitting the record caused an I/O exception: Failed 
to serialize element. Serialized size (> 1136656562 bytes) exceeds JVM heap 
space
at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at 
org.apache.beam.runners.flink.translation.functions.SortingFlinkCombineRunner.combine(SortingFlinkCombineRunner.java:140)
at 
org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction.reduce(FlinkReduceFunction.java:85)
at 
org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:111)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to serialize element. Serialized size (> 
1136656562 bytes) exceeds JVM heap space
at 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:323)
at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:149)
at 
org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:48)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at 
java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
at 
java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at 
java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:170)
at 
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:50)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:71)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:58)
at 
org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:32)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:98)
at 
org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:71)
at org.apache.beam.sdk.coders.KvCoder.encode(KvCod

回复:OutOfMemoryError while doing join operation in flink

2018-11-22 Thread zhijiang
Hi Akshay,

Sorrry I have not thought of a proper way to handle single large record in 
distributed task managers in flink. But I can give some hints for adjusting the 
related memories for work around OOM issue.
Large fraction of memories in task manager are managed by flink for efficiency, 
and these memories are long live persistent in JVM not recycled by gc. You can 
check the parameter "taskmanager.memory.fraction" for this and the default 
value is 0.7 if you have not changed, that means 7GB * 0.7 are used by 
framework.

I am not sure what is the flink version you used. If I rememberd correctly, 
before release-1.5 the network buffers also uses heap memories by default, so 
you should also minus this part of memory from total task manager memory.

If not considering network buffer used by framework, you only leave 7GB * 0.3 
temporaray memories for other parts. The temporaray memories in serializer will 
exceed twice as current size every time if not covering the record size, that 
means one serializer may need 2GB overhead memories for your 1GB record. You 
have 2 slots per task manager for running two tasks, so the total overhead 
memories may need 4GB almost. So you can decrease the 
"taskmanager.memory.fraction" in low fraction or increase the total task 
manager to cover this overhead memories, or set one slot for each task manager. 

Best,
Zhijiang


--
发件人:Akshay Mendole 
发送时间:2018年11月23日(星期五) 02:54
收件人:trohrmann 
抄 送:zhijiang ; user ; 
Shreesha Madogaran 
主 题:Re: OutOfMemoryError while doing join operation in flink

Hi,
Thanks for your reply. I tried running a simple "group by" on just one 
dataset where few keys are repeatedly occurring (in order of millions)  and did 
not include any joins. I wanted to see if this issue is specific to join. But 
as I was expecting, I ran into the same issue. I am giving 7GBs to each task 
manager with 2 slots per task manager. From what I understood so far, such 
cases where individual records somewhere in the pipeline become so large that 
they should be handled in distributed manner instead of handling them by a 
simple data structure in single JVM. I am guessing there is no way to do this 
in Flink today. 
Could you please confirm this?
Thanks,
Akshay


On Thu, Nov 22, 2018 at 9:28 PM Till Rohrmann  wrote:
Hi Akshay,

Flink currently does not support to automatically distribute hot keys across 
different JVMs. What you can do is to adapt the parallelism/number of 
partitions manually if you encounter that one partition contains a lot of hot 
keys. This might mitigate the problem by partitioning the hot keys into 
different partitions.

Apart from that, the problem seems to be as Zhijiang indicated that your join 
result is quite large. One record is 1 GB large. Try to decrease it or give 
more memory to your TMs.

Cheers,
Till
On Thu, Nov 22, 2018 at 1:08 PM Akshay Mendole  wrote:
Hi Zhijiang,
 Thanks for the quick reply. My concern is more towards how 
flink perform joins of two skewed datasets. Pig and spark seems to support the 
join of skewed datasets. The record size that you are mentioning about in your 
reply is after join operation takes place which is definitely going to be huge 
enough not to fit in jvm task manager task slot in my use case. We want to know 
if there is a way in flink to handle such skewed keys by distributing their 
values across different jvms. Let me know if you need more clarity on the issue.
Thanks, 
Akshay 
On Thu, Nov 22, 2018 at 2:38 PM zhijiang  wrote:
Hi Akshay,

You encountered an existing issue for serializing large records to cause OOM.

Every subpartition would create a separate serializer before, and each 
serializer would maintain an internal bytes array for storing intermediate 
serialization results. The key point is that these overhead internal bytes 
array are not managed by framework, and their size would exceed with the record 
size dynamically. If your job has many subpartitions with large records, it may 
probably cause OOM issue.

I already improved this issue to some extent by sharing only one serializer for 
all subpartitions [1], that means we only have one bytes array overhead at 
most. This issue is covered in release-1.7.
Currently the best option may reduce your record size if possible or you can 
increase the heap size of task manager container.

[1] https://issues.apache.org/jira/browse/FLINK-9913

Best,
Zhijiang
--
发件人:Akshay Mendole 
发送时间:2018年11月22日(星期四) 13:43
收件人:user 
主 题:OutOfMemoryError while doing join operation in flink

Hi,
We are converting one of our pig pipelines to flink using apache beam. The 
pig pipeline reads two different data sets (R1 & R2)  from hdfs, enriches them, 
joins them and dumps back to hdfs. The data set R1 is skewed. In a sense, it 
has few keys with lot of records. When we co

回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-25 Thread zhijiang
I think it is probably related with rockdb memory usage if you have not found 
OutOfMemory issue before.

There already existed a jira ticket [1] for fixing this issue, and you can 
watch it for updates. :)

[1] https://issues.apache.org/jira/browse/FLINK-10884

Best,
Zhijiang
--
发件人:Gagan Agrawal 
发送时间:2018年11月24日(星期六) 14:14
收件人:user 
主 题:Flink job failing due to "Container is running beyond physical memory 
limits" error.

Hi,
I am running flink job on yarn where it ran fine so far (4-5 days) and have now 
started failing with following errors.

2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_1542008917197_0038_01_06 because: Container 
[pid=18380,containerID=container_1542008917197_0038_01_06] is running 
beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory 
used; 5.0 GB of 15 GB virtual memory used. Killing container.

This is simple job where we are reading 2 Avro streams from Kafka and applying 
some custom UDF after creating keyed stream from union on those 2 streams and 
writing back output to Kafka. Udf internally uses Map State with RocksDB 
backend. Currently size of checkpoint is around 300 GB and we are running this 
with 10 task manager with 3 GB memory each. I have also set 
"containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink 
version is 1.6.2

Here is the flink command
./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar

I want to understand what are typical reasons for this issue? Also why would 
flink consume more memory than allocated as JVM memory is fixed and will not 
grow beyond max heap. Can this be something related to RocksDB where it may be 
consuming memory outside heap and hence over using defined limits? I didn't 
find this issue when checkpoint size was small (<50 GB). But ever since we are 
now at 300GB size, this issue is coming frequently. I can try increasing 
memory, but I am still interested in knowing what are typical reasons for this 
error if Jvm heap memory can not grow beyond defined limit.

Gagan





回复:回复:Flink job failing due to "Container is running beyond physical memory limits" error.

2018-11-26 Thread zhijiang
It may work aournd by increasing the task manager memory size.

The recover failure is up to serveral issues, whether it had successful 
checkpoint before, the states are available  and what is the failover strategy?

Best,
Zhijiang
--
发件人:Flink Developer 
发送时间:2018年11月26日(星期一) 16:37
收件人:Flink Developer 
抄 送:zhijiang ; user ; Gagan 
Agrawal 
主 题:Re: 回复:Flink job failing due to "Container is running beyond physical 
memory limits" error.

Also, after the Flink job has failed from the above error, the Flink job is 
unable to recover from previous checkpoint. Is this the expected behavior? How 
can the job be recovered successfully from this?


‐‐‐ Original Message ‐‐‐
 On Monday, November 26, 2018 12:35 AM, Flink Developer 
 wrote:

I am also experiencing this error message "Container is running beyond physical 
memory limits". In my case, I am using Flink 1.5.2 with 10 task managers, with 
40 slots for each task manager. The memory assigned during flink cluster 
creation is 1024MB per task manager. The checkpoint is using RocksDb and the 
checkpoint size is very small (10MB).

Is the simply solution to increase the Task Manager memory size? I will try 
from 1024MB to 4096MB per task manager.

‐‐‐ Original Message ‐‐‐
On Sunday, November 25, 2018 7:58 PM, zhijiang  
wrote:

I think it is probably related with rockdb memory usage if you have not found 
OutOfMemory issue before.

There already existed a jira ticket [1] for fixing this issue, and you can 
watch it for updates. :)

[1] https://issues.apache.org/jira/browse/FLINK-10884

Best,
Zhijiang
--
发件人:Gagan Agrawal 
发送时间:2018年11月24日(星期六) 14:14
收件人:user 
主 题:Flink job failing due to "Container is running beyond physical memory 
limits" error.

Hi,
I am running flink job on yarn where it ran fine so far (4-5 days) and have now 
started failing with following errors.

2018-11-24 03:46:21,029 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_1542008917197_0038_01_06 because: Container 
[pid=18380,containerID=container_1542008917197_0038_01_06] is running 
beyond physical memory limits. Current usage: 3.0 GB of 3 GB physical memory 
used; 5.0 GB of 15 GB virtual memory used. Killing container.

This is simple job where we are reading 2 Avro streams from Kafka and applying 
some custom UDF after creating keyed stream from union on those 2 streams and 
writing back output to Kafka. Udf internally uses Map State with RocksDB 
backend. Currently size of checkpoint is around 300 GB and we are running this 
with 10 task manager with 3 GB memory each. I have also set 
"containerized.heap-cutoff-ratio: 0.5" but still facing same issue. Flink 
version is 1.6.2

Here is the flink command
./bin/flink run -m yarn-cluster -yd -yn 10 -ytm 3072 -ys 4 job.jar

I want to understand what are typical reasons for this issue? Also why would 
flink consume more memory than allocated as JVM memory is fixed and will not 
grow beyond max heap. Can this be something related to RocksDB where it may be 
consuming memory outside heap and hence over using defined limits? I didn't 
find this issue when checkpoint size was small (<50 GB). But ever since we are 
now at 300GB size, this issue is coming frequently. I can try increasing 
memory, but I am still interested in knowing what are typical reasons for this 
error if Jvm heap memory can not grow beyond defined limit.

Gagan








回复:buffer pool is destroyed

2018-12-21 Thread zhijiang
Hi Shuang,

Normally this exception you mentioned is not the root cause of failover, and it 
is mainly caused  by cancel process to make task exit.
You can further check whether there are other failures in job master log to 
find the root cause.

Best,
Zhijiang
--
发件人:Chan, Shuang 
发送时间:2018年12月21日(星期五) 11:12
收件人:user@flink.apache.org 
主 题:buffer pool is destroyed


Hi Flink community,
I have a custom source that emits an user-defined data type, BaseEvent.  The 
following code works fine when BaseEvent is not POJO.
But, when I changed it to POJO by adding a default constructor, I’m getting 
“Buffer Pool is destroyed” runtime exception on the Collect method.
DataStream eventStream = see.addSource(new 
AgoraSource(configFile, instance));
DataStream> result_order = 
eventStream
.filter(e -> e instanceof OrderEvent)
.map(e -> (OrderEvent)e)
.map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), 
Double.valueOf(e.OriginalQuantity))).returns(info_tuple3)
.keyBy(e -> e.f0)
.timeWindow(Time.seconds(5))
.reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + 
b.f2))
.map(e -> new Tuple4<>(e.f0, e.f1, e.f2, 
"Order")).returns(info_tuple4);
Any idea?
Shuang


==
Please access the attached hyperlink for an important electronic communications 
disclaimer:
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
==



Re: Buffer stats when Back Pressure is high

2019-01-07 Thread zhijiang
Hi Gagan,

What flink version do you use? And have you checked the 
buffers.inputQueueLength for all the related parallelism (connected with A) of 
B?  It may exist the scenario that only one parallelim B is full of inqueue 
buffers which back pressure A, and the input queue for other parallelism B is 
empty.

Best,
Zhijiang


--
From:Gagan Agrawal 
Send Time:2019年1月7日(星期一) 12:06
To:user 
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in debugging / validating 
that downstream operator is performing slow when Back Pressure is high? Say I 
have A -> B operators and A shows High Back Pressure which indicates something 
wrong or not performing well on B side which is slowing down operator A. 
However when I look at buffers.inputQueueLength for operator B, it's 0. My 
understanding is that when B is processing slow, it's input buffer will be full 
of incoming messages which ultimately blocks/slows down upstream operator A. 
However it doesn't seem to be happening in my case. Can someone throw some 
light on how should different stats around buffers (e.g buffers.inPoolUsage, 
buffers.inputQueueLength, numBuffersInLocalPerSecond, 
numBuffersInRemotePerSecond) look like when downstream operator is performing 
slow?

Gagan



Re: ConnectTimeoutException when createPartitionRequestClient

2019-01-09 Thread zhijiang
Hi Wenrui,

I suspect another issue which might cause connection failure. You can check 
whether the netty server already binds and listens port successfully in time 
before the client requests connection. If there exists some time-consuming 
process during TM startup which might delay netty server start, so when the 
client requests connection, the server is not ready which may cause connection 
timeout or failure.

From your description, it seems exist in only some TM. Because when you 
decrease the total parallel, it might miss the problem TM and does not cause 
this issue. The default number of netty thread and timeout should make sense 
for normal cases.

Best,
Zhijiang


--
From:Wenrui Meng 
Send Time:2019年1月9日(星期三) 18:18
To:Till Rohrmann 
Cc:user ; Konstantin 
Subject:Re: ConnectTimeoutException when createPartitionRequestClient

Hi Till,

This job is not on AthenaX but on a special uber version Flink. I tried to ping 
the connected host from connecting host. It seems very stable. For the 
connection timeout, I do set it as 20min but it still report the timeout after 
2 minutes. Could you let me know how do you test locally about the timeout 
setting?

Thanks,
Wenrui
On Tue, Jan 8, 2019 at 7:06 AM Till Rohrmann  wrote:
Hi Wenrui,

the exception now occurs while finishing the connection creation. I'm not sure 
whether this is so different. Could it be that your network is overloaded or 
not very reliable? Have you tried running your Flink job outside of AthenaX?

Cheers,
Till
On Tue, Jan 8, 2019 at 2:50 PM Wenrui Meng  wrote:
Hi Till,

Thanks for your reply. Our cluster is Yarn cluster. I found that if we decrease 
the total parallel the timeout issue can be avoided. But we do need that amount 
of taskManagers to process data. In addition, once I increase the netty server 
threads to 128, the error is changed to to following error. It seems the cause 
is different. Could you help take a look?

2b0ac47c1eb1bcbbbe4a97) switched from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote task 
manager + 'athena464-sjc1/10.70.129.13:39466' has failed. This might indicate 
that the remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:134)
at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:148)
at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connecting to remote task manager + 'athena464-sjc1/10.70.129.13:39466' has 
failed. This might indicate that the remote task manager has been lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPro

Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread zhijiang
Congrats Thomas!

Best,
Zhijiang
--
From:Kostas Kloudas 
Send Time:2019年2月12日(星期二) 22:46
To:Jark Wu 
Cc:Hequn Cheng ; Stefan Richter 
; user 
Subject:Re: [ANNOUNCE] New Flink PMC member Thomas Weise

Congratulations Thomas!

Best,
Kostas
On Tue, Feb 12, 2019 at 12:39 PM Jark Wu  wrote:
Congrats Thomas!
On Tue, 12 Feb 2019 at 18:58, Hequn Cheng  wrote:
Congrats Thomas!

Best, Hequn


On Tue, Feb 12, 2019 at 6:53 PM Stefan Richter  
wrote:
Congrats Thomas!,

Best,
Stefan

Am 12.02.2019 um 11:20 schrieb Stephen Connolly 
:
Congratulations to Thomas. I see that this is not his first time in the PMC 
rodeo... also somebody needs to update LDAP as he's not on 
https://people.apache.org/phonebook.html?pmc=flink yet!

-stephenc
On Tue, 12 Feb 2019 at 09:59, Fabian Hueske  wrote:
Hi everyone,

On behalf of the Flink PMC I am happy to announce Thomas Weise as a new member 
of the Apache Flink PMC.

Thomas is a long time contributor and member of our community. 
He is starting and participating in lots of discussions on our mailing lists, 
working on topics that are of joint interest of Flink and Beam, and giving 
talks on Flink at many events.

Please join me in welcoming and congratulating Thomas!

Best,
Fabian



-- 
Kostas Kloudas | Software Engineer


Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen 



Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread zhijiang
Thanks Stephan for this proposal and I totally agree with it. 

It is very necessary to summarize the overall features/directions the community 
is going or planning to go. Although I almost checked the mailing list 
everyday, it still seems difficult to trace everything. In addtion I think this 
whole roadmap picture can also help expose the relationships among different 
items, even avoid the similar/duplicated thoughts or works.

Just one small suggestion, if we coule add some existing link 
(jira/discussion/FLIP/google doc) for each listed item, then it would be easy 
to keep trace of the interested one and handle the progress of it.

Best,
Zhijiang
--
From:Jeff Zhang 
Send Time:2019年2月14日(星期四) 18:03
To:Stephan Ewen 
Cc:dev ; user ; jincheng sun 
; Shuyi Chen ; Rong Rong 

Subject:Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

Hi Stephan,

Thanks for this proposal. It is a good idea to track the roadmap. One
suggestion is that it might be better to put it into wiki page first.
Because it is easier to update the roadmap on wiki compared to on flink web
site. And I guess we may need to update the roadmap very often at the
beginning as there's so many discussions and proposals in community
recently. We can move it into flink web site later when we feel it could be
nailed down.

Stephan Ewen  于2019年2月14日周四 下午5:44写道:

> Thanks Jincheng and Rong Rong!
>
> I am not deciding a roadmap and making a call on what features should be
> developed or not. I was only collecting broader issues that are already
> happening or have an active FLIP/design discussion plus committer support.
>
> Do we have that for the suggested issues as well? If yes , we can add them
> (can you point me to the issue/mail-thread), if not, let's try and move the
> discussion forward and add them to the roadmap overview then.
>
> Best,
> Stephan
>
>
> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>
>> Thanks Stephan for the great proposal.
>>
>> This would not only be beneficial for new users but also for contributors
>> to keep track on all upcoming features.
>>
>> I think that better window operator support can also be separately group
>> into its own category, as they affects both future DataStream API and batch
>> stream unification.
>> can we also include:
>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>> - Improving sliding window operator [1]
>>
>> One more additional suggestion, can we also include a more extendable
>> security module [2,3] @shuyi and I are currently working on?
>> This will significantly improve the usability for Flink in corporate
>> environments where proprietary or 3rd-party security integration is needed.
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>>
>>
>>
>> On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
>> wrote:
>>
>>> Very excited and thank you for launching such a great discussion,
>>> Stephan !
>>>
>>> Here only a little suggestion that in the Batch Streaming Unification
>>> section, do we need to add an item:
>>>
>>> - Same window operators on bounded/unbounded Table API and DataStream
>>> API
>>> (currently OVER window only exists in SQL/TableAPI, DataStream API does
>>> not yet support)
>>>
>>> Best,
>>> Jincheng
>>>
>>> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>>>
>>>> Hi all!
>>>>
>>>> Recently several contributors, committers, and users asked about making
>>>> it more visible in which way the project is currently going.
>>>>
>>>> Users and developers can track the direction by following the
>>>> discussion threads and JIRA, but due to the mass of discussions and open
>>>> issues, it is very hard to get a good overall picture.
>>>> Especially for new users and contributors, is is very hard to get a
>>>> quick overview of the project direction.
>>>>
>>>> To fix this, I suggest to add a brief roadmap summary to the homepage.
>>>> It is a bit of a commitment to keep that roadmap up to date, but I think
>>>> the benefit for users justifies that.
>>>> The Apa

Re: Confusion in Heartbeat configurations

2019-02-18 Thread zhijiang
Hi sohimankotia,

In order not to strongly rely on the akka implementation, flink implements the 
heartbeat mechanism for health monitor for the components of TaskExecutor, 
JobMaster and ResourceManager from FLIP6. So you can see two sets of heartbeat 
setting, one is for akka internal implementation prefix with `akka` and the 
other is flink internal implementation.

Best,
Zhijiang
--
From:sohimankotia 
Send Time:2019年2月18日(星期一) 14:40
To:user 
Subject:Confusion in Heartbeat configurations

Hi, 

In
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html
link there are two heartbeat config are mentioned . 

akka.watch.heartbeat.interval
akka.watch.heartbeat.pause

Vs

heartbeat.interval
heartbeat.timeout


Can u guys pls explain what exactly is difference between them and which
component of job execution graph they impact . 

Thanks




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul,

I am not sure whether task thread is involverd in some works during snapshoting 
states for FsStateBackend. But I have another experience which might also cause 
your problem.
From your descriptions below, the last task is blocked by 
`SingleInputGate.getNextBufferOrEvent` that means the middle task does not have 
any outpus or the middle operator does not process records.
The backpressure is high between source and middle task which results in 
blocking the source task in `requestBufferBuilder`.

Based on above two points, I guess the middle task is waiting for barrier from 
some source tasks. For the input channels which already receives the barriers, 
the middle task would not process the following data buffers and just cache 
them, so it would result in backpressure the corresponding source based on 
credit-based flow control.  For the input channels without barriers, if there 
are also no data buffers, then the middle task would not have any outputs. So I 
think one hint is to trace why some source task emits barrier delay.

In order to double check the above analysis, you can change the checkpoint mode 
from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not 
decreased for a period as before, I think we could confirm the above analysis. 
:)

Best,
Zhijiang
--
From:Paul Lam 
Send Time:2019年2月28日(星期四) 15:17
To:user 
Subject:Flink performance drops when async checkpoint is slow

Hi,

I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some 
transformations and aggregates, and write to two Kafka topics respectively. 
Meanwhile, there’s a custom source that pulls configurations for the 
transformations periodically. The generic job graph is as below.



The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is 
unstable, and sometimes HDFS client reports slow read and slow 
waitForAckedSeqno during checkpoints. When that happens, the Flink job consume 
rate drops significantly, and some taskmanager’ cpu usage drops from about 140% 
to 1%, all the task threads on that taskmanager are blocked. This situation 
lasts from seconds to a minute. We started a parallel job with everything the 
same except checkpointing disabled, and it runs very steady.
But I think as the checkpointing is async, it should not affect the task 
threads.

There are some additional information that we observed:

-  When the performance drops, jstack shows that Kafka source and the task 
right after it is blocked at requesting memory buffer (with back pressure close 
to 1), and the last task is blocked at  `SingleInputGate.getNextBufferOrEvent`. 
- The dashboard shows that the buffer during alignment is less than 10 MB, even 
when back pressure is high.

We’ve been struggling with this problem for weeks, and any help is appreciated. 
Thanks a lot!

Best,
Paul Lam



屏幕快照 2019-02-25 11.24.54.png
Description: Binary data


Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud,

I think there are two key points. First the checkpoint barrier might be emitted 
delay from source under high backpressure for synchronizing lock. 
Second the barrier has to be queued in flighting data buffers, so the 
downstream task has to process all the buffers before barriers to trigger 
checkpoint and this would take some time under back pressure.

There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting 
buffers before barrier, you can check the config of  
"taskmanager.network.memory.buffers-per-channel" and 
"taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to 
process source data faster, to avoid backpressure in some extent.

You could check which way is suitable for your scenario and may have a try.

Best,
Zhijiang
--
From:LINZ, Arnaud 
Send Time:2019年2月28日(星期四) 17:28
To:user 
Subject:Checkpoints and catch-up burst (heavy back pressure)


Hello,
I have a simple streaming app that get data from a source and store it to HDFS 
using a sink similar to the bucketing file sink. Checkpointing mode is “exactly 
once”.
Everything is fine on a “normal” course as the sink is faster than the source; 
but when we stop the application for a while and then restart it, we have a 
catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail 
(time out) until the source has been totally caught up. This is annoying 
because the sink does not “commit” the data before a successful checkpoint is 
made, and so the app release all the “catch up” data as a atomic block that can 
be huge if the streaming app was stopped for a while, adding an unwanted stress 
to all the following hive treatments that use the data provided in micro 
batches and to the Hadoop cluster.
How should I handle the situation? Is there something special to do to get 
checkpoints even during heavy load?
The problem does not seem to be new, but I was unable to find any practical 
solution in the documentation.
Best regards,
Arnaud


 L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

 The integrity of this message cannot be guaranteed on the Internet. The 
company that sent this message cannot therefore be held liable for its content 
nor attachments. Any unauthorized use or dissemination is prohibited. If you 
are not the intended recipient of this message, then please delete it and 
notify the sender.



Re: Flink performance drops when async checkpoint is slow

2019-02-28 Thread zhijiang
Hi Paul,

Thanks for your feedback. If the at-least-once mode still causes the problem, 
we can confirm it is not caused by blocking behavior in exactly-once-mode 
mentioned before.

For at-least once, the task would continue processing the buffers following 
with barriers during allignment. But for exactly-once, the task would block the 
channel after reading barrier during allignment. It is difficult to confirm 
based on the obsolute state size which is also related to network buffer 
setting and parallelism.

I think your problem is once activing checkpoint, the backpressure is also 
caused sometimes which results in low performance and cpu usages. 
Maybe we can analyze which vertex and subtask cause the backpressure and then 
traces its jstack to check which operations slow down it. The source vertex is 
blocked in `requestingBufferBuilder` once backpressure. But I am not sure it is 
caused by middle vertex or the last sink vertex. If the middle vertex is also 
blocked by requesting buffer as you mentioned in first email, then the 
backpressure should be caused by last sink vertex. But the jstack of last sink 
task should not be in `getNextBuffer` or you did not trace all the sink tasks 
parallelism.  

My suggestion is first to check which vertex causes the backpressure (middle or 
last sink vertex?). And then trace the jstack of proper parallelism task in 
this vertex, you can select the task with the largest inqueue length. I think 
we might find something if seeing which operation delays the task to cause the 
backpressure, and this operation might be involved with HDFS. :)

Best,
Zhijiang


--
From:Paul Lam 
Send Time:2019年2月28日(星期四) 19:17
To:zhijiang 
Cc:user 
Subject:Re: Flink performance drops when async checkpoint is slow

Hi Zhijiang,

Thanks a lot for your reasoning! 

I tried to set the checkpoint to at-leaset-once as you suggested, but unluckily 
the problem remains the same :(

IMHO, if it’s caused by barrier alignment, the state size (mainly buffers 
during alignment) would be big, right? But actually it’s not, so we didn’t 
think that way before.

Best,
Paul Lam

在 2019年2月28日,16:12,zhijiang  写道:
Hi Paul,

I am not sure whether task thread is involverd in some works during snapshoting 
states for FsStateBackend. But I have another experience which might also cause 
your problem.
From your descriptions below, the last task is blocked by 
`SingleInputGate.getNextBufferOrEvent` that means the middle task does not have 
any outpus or the middle operator does not process records.
The backpressure is high between source and middle task which results in 
blocking the source task in `requestBufferBuilder`.

Based on above two points, I guess the middle task is waiting for barrier from 
some source tasks. For the input channels which already receives the barriers, 
the middle task would not process the following data buffers and just cache 
them, so it would result in backpressure the corresponding source based on 
credit-based flow control.  For the input channels without barriers, if there 
are also no data buffers, then the middle task would not have any outputs. So I 
think one hint is to trace why some source task emits barrier delay.

In order to double check the above analysis, you can change the checkpoint mode 
from `exactly-once` to `at-least once`, if the cpu usages and task TPS are not 
decreased for a period as before, I think we could confirm the above analysis. 
:)

Best,
Zhijiang
--
From:Paul Lam 
Send Time:2019年2月28日(星期四) 15:17
To:user 
Subject:Flink performance drops when async checkpoint is slow

Hi,

I have a Flink job (version 1.5.3) that consumes from Kafka topic, does some 
transformations and aggregates, and write to two Kafka topics respectively. 
Meanwhile, there’s a custom source that pulls configurations for the 
transformations periodically. The generic job graph is as below.

<屏幕快照 2019-02-25 11.24.54.png>

The job uses FsStateBackend and checkpoints to HDFS, but HDFS’s load is 
unstable, and sometimes HDFS client reports slow read and slow 
waitForAckedSeqno during checkpoints. When that happens, the Flink job consume 
rate drops significantly, and some taskmanager’ cpu usage drops from about 140% 
to 1%, all the task threads on that taskmanager are blocked. This situation 
lasts from seconds to a minute. We started a parallel job with everything the 
same except checkpointing disabled, and it runs very steady.
But I think as the checkpointing is async, it should not affect the task 
threads.

There are some additional information that we observed:

-  When the performance drops, jstack shows that Kafka source and the task 
right after it is blocked at requesting memory buffer (with back pressure close 
to 1), and the last task is blocked at  `SingleInputGate.getNextBufferOrEvent`. 
- The dashboard shows that the buffer during alignm

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-02-28 Thread zhijiang
Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.

For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one 
single downstream task (`a` is the parallelism of source vertex), because it is 
all-to-all connection. The barrier alignment takes more time in rebalance mode 
than forward mode.

Best,
Zhijiang
--
From:LINZ, Arnaud 
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang ; user 
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)


Update :
Option  1 does not work. It still fails at the end of the timeout, no matter 
its value.
Should I implement a “bandwidth” management system by using artificial 
Thread.sleep in the source depending on the back pressure ? 
De : LINZ, Arnaud 
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' ; user 
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
Hi Zhihiang,
Thanks for your feedback.
I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will 
let you know. Setting it higher than 40 min does not make much sense since 
after 40 min the pending output is already quite large.
Option 3 won’t work ; I already take too many ressources, and as my source is 
more or less a hdfs directory listing, it will always be far faster than any 
mapper that reads the file and emits records based on its content or sink that 
store the transformed data, unless I put “sleeps” in it (but is this really a 
good idea?)
Option 2: taskmanager.network.memory.buffers-per-channel and 
taskmanager.network.memory.buffers-per-gate are currently unset in my 
configuration (so to their default of 2 and 8), but for this streaming app I 
have very few exchanges between nodes (just a rebalance after the source that 
emit file names, everything else is local to the node). Should I adjust their 
values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang  
Envoyé : jeudi 28 février 2019 10:58
À : user ; LINZ, Arnaud 
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
I think there are two key points. First the checkpoint barrier might be emitted 
delay from source under high backpressure for synchronizing lock. 
Second the barrier has to be queued in flighting data buffers, so the 
downstream task has to process all the buffers before barriers to trigger 
checkpoint and this would take some time under back pressure.
There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting 
buffers before barrier, you can check the config of  
"taskmanager.network.memory.buffers-per-channel" and 
"taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to 
process source data faster, to avoid backpressure in some extent.
You could check which way is suitable for your scenario and may have a try.
Best,
Zhijiang
--
From:LINZ, Arnaud 
Send Time:2019年2月28日(星期四) 17:28
To:user 
Subject:Checkpoints and catch-up burst (heavy back pressure)
Hello,
I have a simple streaming app that get data from a source and store it to HDFS 
using a sink similar to the bucketing file sink. Checkpointing mode is “exactly 
once”.
Everything is fine on a “normal” course as the sink is faster than the source; 
but when we stop the application for a while and then restart it, we have a 
catch-up burst to get all the messages emitted in the meanwhile.
During this bu

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-03 Thread zhijiang
Hi Arnaud,

I think I understand your special user case based on your further explanation. 
As you said, it is easy for source to emit the whole file names caching in 
network buffers because the emitted file name is so small and flatmap/sink 
processing is slow. Then when checkpoint triggered, the barrier is behind the 
whole set of file names, that means the sink can not receive the barrier until 
reading and writing all the corresponding files. 

So the proper solution in your case has to control the emit rate on source side 
based on sink catchup progress in order to avoid many files queued in front of 
barriers.  This is the right way to try and wish your solution with 2 
parameters work.

Best,
Zhijiang


--
From:LINZ, Arnaud 
Send Time:2019年3月2日(星期六) 16:45
To:zhijiang ; user 
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

 Hello,
 When I think about it, I figure out that a barrier for the source is the whole 
set of files and therefore the checkpoint will never complete until the sink 
have caught up.
 The simplest way to deal with it without refactoring is to add 2 parameters to 
the source, a file number  threshold detecting the catchup mode and a max file 
per sec limitation when this occupe, slightly lower than the natural catchup 
rate.

  Message d'origine 
 De : "LINZ, Arnaud" 
 Date : ven., mars 01, 2019 2:04 PM +0100
 A : zhijiang , user 
 Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi,
I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a 
HDFS directory. DataSet emitted by the source is a data set of file names, not 
file content. These filenames are rebalanced, and sent to workers (parallelism 
= 15) that will use a flatmapper that open the file, read it, decode it, and 
send records (forward mode) to the sinks (with a few 1-to-1 mapping 
in-between). So the flatmap operation is a time-consuming one as the files are 
more than 200Mb large each; the flatmapper will emit millions of record to the 
sink given one source record (filename).
The rebalancing, occurring at the file name level, does not use much I/O and I 
cannot use one-to-one mode at that point if I want some parallelims since I 
have only one source.
I did not put file decoding directly in the sources because I have no good way 
to distribute files to sources without a controller (input directory is unique, 
filenames are random and cannot be “attributed” to one particular source 
instance easily). 
Alternatively, I could have used a dispatcher daemon separated from the 
streaming app that distribute files to various directories, each directory 
being associated with a flink source instance, and put the file reading & 
decoding directly in the source, but that seemed more complex to code and 
exploit than the filename source. Would it have been better from the 
checkpointing perspective?
About the ungraceful source sleep(), is there a way, programmatically, to know 
the “load” of the app, or to determine if checkpointing takes too much time, so 
that I can do it only on purpose?
Thanks,
Arnaud
De : zhijiang  
Envoyé : vendredi 1 mars 2019 04:59
À : user ; LINZ, Arnaud 
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
Thanks for the further feedbacks!
For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.
For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.
For option3: If the resource is limited, it is still not working on your side.
It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.
I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one 
single downstream task (`a` is the parallelism of source vertex), because it is 
all-to-all conn

Re: Flink credit based flow control

2019-03-11 Thread zhijiang
Hi Brian,

Actually I also thought of adding the metrics you mentioned after contributing 
the credit-based flow control. It should help performance tuning sometimes. If 
you want to add this metirc, you could trace the related process in 
`ResultSubpartition`. When the backlog is increasd during adding 
`BufferConsumer` in queue, you can check the current credits for this sub 
partition. Another possible way is when the subpartition view is changed from 
available to unavailable because of no credits, wecould add some metrics here. 
But even though we found the sender is blocked because of no credits, we should 
still need distinguish two conditions. If the sender is backpressured, then 
this condition of no credits is within expectation. 

Maybe it does not need to add extra metric to trace your problem. You can check 
whether the high latency network is caused by backpressure. And what is the 
flush timeout you config. Also you can trace the current metrics of 
outqueue.usages|length and inqueue.usags|length to find something. 

Best,
Zhijiang
--
From:Brian Ramprasad 
Send Time:2019年3月12日(星期二) 03:47
To:user 
Subject:Flink credit based flow control

Hi,
I am trying to use the most recent version of Flink over a high latency network 
and I am trying to measure how long a sender may wait for credits before it can 
send buffers to the receiver. Does anyone know which function/class where I can 
measure at the sender side the time spent waiting to receive the incoming 
credit announcements?
Thanks
Brian R



Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-10 Thread zhijiang
Cool!

Finally see the FLINK 1.8.0 release. Thanks Aljoscha for this excellent work 
and efforts for other contributors.

We would continue working hard for FLINK 1.9.0

Best,
Zhijiang
--
From:vino yang 
Send Time:2019年4月10日(星期三) 17:33
To:d...@flink.apache.org 
Cc:Aljoscha Krettek ; user ; 
announce 
Subject:Re: [ANNOUNCE] Apache Flink 1.8.0 released

Great news!

Thanks Aljoscha for being the release manager and thanks to all the 
contributors!

Best,
Vino
Driesprong, Fokko  于2019年4月10日周三 下午4:54写道:
Great news! Great effort by the community to make this happen. Thanks all!

 Cheers, Fokko

 Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang :

 > Thanks Aljoscha and all others who made contributions to FLINK 1.8.0.
 > Looking forward to FLINK 1.9.0.
 >
 > Regards,
 > Shaoxuan
 >
 > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek 
 > wrote:
 >
 > > The Apache Flink community is very happy to announce the release of
 > Apache
 > > Flink 1.8.0, which is the next major release.
 > >
 > > Apache Flink(r) is an open-source stream processing framework for
 > > distributed, high-performing, always-available, and accurate data
 > streaming
 > > applications.
 > >
 > > The release is available for download at:
 > > https://flink.apache.org/downloads.html
 > >
 > > Please check out the release blog post for an overview of the
 > improvements
 > > for this bugfix release:
 > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html
 > >
 > > The full release notes are available in Jira:
 > >
 > >
 > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344274
 > >
 > > We would like to thank all contributors of the Apache Flink community who
 > > made this release possible!
 > >
 > > Regards,
 > > Aljoscha
 >



Re: Question regarding "Insufficient number of network buffers"

2019-04-11 Thread zhijiang
Hi Allen,

There are two ways for setting network buffers. The old way via 
`taskmanager.network.numberOfBuffers` is deprecated. The new way is via three 
parameters min,max and fraction. 
The specific formula is Math.min(network.memory.max, 
Math.max(network.memory.min, network.memory.fraction * jvmMemory). 
If both ways are setting, only the new way works. You can adjust these three 
parameters accordingly. 
Also you could check the log of task manager by searching " MB for network 
buffer pool (number of memory segments: " to confirm whether your setting is 
working as expected.

Best,
Zhijiang
--
From:Xiangfeng Zhu 
Send Time:2019年4月12日(星期五) 08:03
To:user 
Subject:Question regarding "Insufficient number of network buffers"

Hello,

My name is Allen, and I'm currently researching different distributed execution 
engines. I wanted to run some benchmarks on Flink with a 10-node cluster(each 
node has 64vCPUs and 376GB memory). I ran the program with parallelism 320 and 
got an error message: 
"Caused by: java.io.IOException: Insufficient number of network buffers: 
required 320, but only 128 available. The total number of network buffers is 
currently set to 32768 of 32768 bytes each. You can increase this number by 
setting the configuration keys 'taskmanager.network.memory.fraction', 
'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'." 

Currently, I set the following parameters:
jobmanager.heap.size: 102400m
taskmanager.memory.size: 102400m
taskmanager.numberOfTaskSlots: 32
taskmanager.network.memory.min: 102400m
taskmanager.network.memory.max: 102400m
taskmanager.network.memory.fraction: 0.5
(For the last three fields, I've also tried to set 
taskmanager.network.numberOfBuffers: 40960 directly)
Could you please give me some advice about how should I fix it?
Thank you so much! 

Best,
Allen



Re: Netty channel closed at AKKA gated status

2019-04-14 Thread zhijiang
Hi Wenrui,

I think the akka gated issue and inactive netty channel are both caused by some 
task manager exits/killed. You should double check the status and reason of 
this task manager `'athena592-phx2/10.80.118.166:44177'`.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月13日(星期六) 01:01
To:user 
Cc:tzulitai 
Subject:Netty channel closed at AKKA gated status

We encountered the netty channel inactive issue while the AKKA gated that task 
manager. I'm wondering whether the channel closed because of the AKKA gated 
status, since all message to the taskManager will be dropped at that moment, 
which might cause netty channel exception. If so, shall we have coordination 
between AKKA and Netty? The gated status is not intended to fail the system. 
Here is the stack trace fthe or exception

2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 3758 (3788228399 bytes in 5967 ms).
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96) 
(93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)



Re: Retain metrics counters across task restarts

2019-04-14 Thread zhijiang
Hi Peter,

The lifecycle of these metrics are coupled with lifecycle of task, So the 
metrics would be initialized after task is restarted. I think of one possible 
option is that you could store your required metrics into state, then the 
metric states would be restored from backend after task is restarted.

Best,
Zhijiang
--
From:Peter Zende 
Send Time:2019年4月14日(星期日) 00:25
To:user 
Subject:Retain metrics counters across task restarts

Hi all

We're exposing Prometheus metrics from our Flink (v1.7.1) pipeline to 
Prometheus, e.g: the total number of processed records. This works fine until 
any of the tasks is restarted within this yarn application. Then the counter is 
reset and it starts incrementing values from 0.
How can we retain such counter through the entire lifetime of the yarn 
application similarly to Hadoop counters?

Thanks
Peter



Re: Can back pressure data be gathered by Flink metric system?

2019-04-14 Thread zhijiang
Hi Henry,

The backpressure tracking is not realized in metric framework, you could check 
the details via [1]. I am not sure why your requirements is showing 
backpressure in metrics.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html

Best,
Zhijiang

 --
From:徐涛 
Send Time:2019年4月15日(星期一) 10:19
To:user 
Subject:Can back pressure data be gathered by Flink metric system?

Hi Experts,
 From the page Flink metric 
system(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 I do not find the info about the back pressure. I want to get the backpressure 
data and plot it, but I do not know how to get it via metric. 
 Can anybody help me about it? Thanks a lot.

Best
Henry



Re: Can back pressure data be gathered by Flink metric system?

2019-04-15 Thread zhijiang
Hi Henry,

Thanks for the explanation. I am not sure whether it is feasible on your side 
to monitor the backpressure via restful api provided by flink.

Some experience on my side to share. We ever monitored the backpressure via the 
metrics of outqueue length/usage on producer side and inqueue length/usage on 
consumer side. Although it is not very accurate sometimes, it could provide 
some hints of backpressure, because the outqueue and inqueue should be filled 
with buffers between producer and consumer when backpressure occurs.

Best,
Zhijiang
--
From:徐涛 
Send Time:2019年4月16日(星期二) 09:33
To:zhijiang 
Cc:user 
Subject:Re: Can back pressure data be gathered by Flink metric system?

Hi Zhijiang,
 Because I want to know the current and the trend of backpressure status in 
Flink Job. Like other index such as latency, I can monitor it, and show it in 
graph by getting data from metric. Now using the metric to get the backpressure 
data is the simplest way I can think.

Best
Henry

在 2019年4月15日,上午10:34,zhijiang  写道:
Hi Henry,

The backpressure tracking is not realized in metric framework, you could check 
the details via [1]. I am not sure why your requirements is showing 
backpressure in metrics.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html

Best,
Zhijiang

 --
From:徐涛 
Send Time:2019年4月15日(星期一) 10:19
To:user 
Subject:Can back pressure data be gathered by Flink metric system?

Hi Experts,
 From the page Flink metric 
system(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 I do not find the info about the back pressure. I want to get the backpressure 
data and plot it, but I do not know how to get it via metric. 
 Can anybody help me about it? Thanks a lot.

Best
Henry





Re: Netty channel closed at AKKA gated status

2019-04-15 Thread zhijiang
Hi Wenrui,

You might further check whether there exists network connection issue between 
job master and target task executor if you confirm the target task executor is 
still alive.

Best,
Zhijiang
--
From:Biao Liu 
Send Time:2019年4月16日(星期二) 10:14
To:Wenrui Meng 
Cc:zhijiang ; user ; 
tzulitai 
Subject:Re: Netty channel closed at AKKA gated status

Hi Wenrui,
If a task manager is killed (kill -9), it would have no chance to log anything. 
If the task manager exits since connection timeout, there would be something in 
log file. So it is probably killed by other user or operating system. Please 
check the log of operating system. BTW, I don't think "DEBUG log level" would 
help.
Wenrui Meng  于2019年4月16日周二 上午9:16写道:
There is no exception or any warning in the task manager 
`'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not shut 
down either in cluster monitor dashboard. It probably requires to turn on DEBUG 
log to get more useful information. If the task manager gets killed, I assume 
there will be terminating log in the task manager log. If not, I don't know how 
to figure out whether it's due to task manager gets killed or just a connection 
timeout.



On Sun, Apr 14, 2019 at 7:22 PM zhijiang  wrote:
Hi Wenrui,

I think the akka gated issue and inactive netty channel are both caused by some 
task manager exits/killed. You should double check the status and reason of 
this task manager `'athena592-phx2/10.80.118.166:44177'`.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月13日(星期六) 01:01
To:user 
Cc:tzulitai 
Subject:Netty channel closed at AKKA gated status

We encountered the netty channel inactive issue while the AKKA gated that task 
manager. I'm wondering whether the channel closed because of the AKKA gated 
status, since all message to the taskManager will be dropped at that moment, 
which might cause netty channel exception. If so, shall we have coordination 
between AKKA and Netty? The gated status is not intended to fail the system. 
Here is the stack trace fthe or exception

2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 3758 (3788228399 bytes in 5967 ms).
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96) 
(93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829)
at 
org.apache.flink.shaded.

Re: Netty channel closed at AKKA gated status

2019-04-21 Thread zhijiang
Hi Wenrui,

I think you could trace the log of node manager which contains the lifecycle of 
this task executor. Maybe this task executor is killed by node manager because 
of memory overuse.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月20日(星期六) 09:48
To:zhijiang 
Cc:Biao Liu ; user ; tzulitai 

Subject:Re: Netty channel closed at AKKA gated status

Attached the lost task manager last 1 lines log. Anyone can help take a 
look? 

Thanks,
Wenrui
On Fri, Apr 19, 2019 at 6:32 PM Wenrui Meng  wrote:
Looked at a few same instances. The lost task manager was indeed not active 
anymore since there is no log for that task manager printed after the 
connection issue timestamp. I guess somehow that task manager died silently 
without exception or termination relevant information logged. I double checked 
the lost task manager host, the GC, CPU, memory, network, disk I/O all look 
good without any spike. Is there any other possibility that the task manager 
can be terminated? We run our jobs in the yarn cluster. 
On Mon, Apr 15, 2019 at 10:47 PM zhijiang  wrote:
Hi Wenrui,

You might further check whether there exists network connection issue between 
job master and target task executor if you confirm the target task executor is 
still alive.

Best,
Zhijiang
--
From:Biao Liu 
Send Time:2019年4月16日(星期二) 10:14
To:Wenrui Meng 
Cc:zhijiang ; user ; 
tzulitai 
Subject:Re: Netty channel closed at AKKA gated status

Hi Wenrui,
If a task manager is killed (kill -9), it would have no chance to log anything. 
If the task manager exits since connection timeout, there would be something in 
log file. So it is probably killed by other user or operating system. Please 
check the log of operating system. BTW, I don't think "DEBUG log level" would 
help.
Wenrui Meng  于2019年4月16日周二 上午9:16写道:
There is no exception or any warning in the task manager 
`'athena592-phx2/10.80.118.166:44177'` log. In addition, the host was not shut 
down either in cluster monitor dashboard. It probably requires to turn on DEBUG 
log to get more useful information. If the task manager gets killed, I assume 
there will be terminating log in the task manager log. If not, I don't know how 
to figure out whether it's due to task manager gets killed or just a connection 
timeout.



On Sun, Apr 14, 2019 at 7:22 PM zhijiang  wrote:
Hi Wenrui,

I think the akka gated issue and inactive netty channel are both caused by some 
task manager exits/killed. You should double check the status and reason of 
this task manager `'athena592-phx2/10.80.118.166:44177'`.

Best,
Zhijiang
--
From:Wenrui Meng 
Send Time:2019年4月13日(星期六) 01:01
To:user 
Cc:tzulitai 
Subject:Netty channel closed at AKKA gated status

We encountered the netty channel inactive issue while the AKKA gated that task 
manager. I'm wondering whether the channel closed because of the AKKA gated 
status, since all message to the taskManager will be dropped at that moment, 
which might cause netty channel exception. If so, shall we have coordination 
between AKKA and Netty? The gated status is not intended to fail the system. 
Here is the stack trace fthe or exception

2019-04-12 12:46:38.413 [flink-akka.actor.default-dispatcher-90] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed 
checkpoint 3758 (3788228399 bytes in 5967 ms).
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.175 [flink-akka.actor.default-dispatcher-65] WARN  
akka.remote.ReliableDeliverySupervisor 
flink-akka.remote.default-remote-dispatcher-25 - Association with remote system 
[akka.tcp://flink@athena592-phx2:44487] has failed, address is now gated for 
[5000] ms. Reason: [Disassociated] 
2019-04-12 12:49:14.230 [flink-akka.actor.default-dispatcher-65] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph  - id (14/96) 
(93fcbfc535a190e1edcfd913d5f304fe) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'athena592-phx2/10.80.118.166:44177'. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
at 
org.apache.flink.shaded.netty4.io.net

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread zhijiang
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.

When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.

I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)
- locked <0x00063c785350> (a java.lang.Object)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
- locked <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"DataSource  (1/1)":
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)
- waiting to lock <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)
- locked <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.b

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-17 Thread zhijiang
I already created the jira [1] for it and you could monitor it for progress.

In addition, the SpillableSubpartition would be abandoned from FLINK-1.9, and 
stephan already implemented a new BoundedBlockingSubpartition to replace it. Of 
course we would still provide the support for the existing bugs in previous 
flink versions.

[1] https://issues.apache.org/jira/browse/FLINK-12544

Best,
Zhijiang


--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 19:00
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-21 Thread zhijiang
Hi  Krishna,

Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
 
We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)
- waiting to lock <0x00063fdf4888> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)
at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)
- locked <0x00063fdf4ac8> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)
at 
org.apache.flink.

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-05-21 Thread zhijiang
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?

If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.

Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task to release memory in the same TM. Or you could increase the 
network buffer setting to work aournd, but not sure this way could work for 
your case because it is up to the total data size the source produced.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink 
v1.6.4 which we are using now but the problem now seems to come up for 
relatively simpler scenarios as well. Deadlock dump below -

Java stack information for the threads listed above:
===
"CoGroup (2/2)":
at 
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)
- waiting to lock <0x00062bf859b8> (a java.lang.Object)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
at java.lang.Thread.run(Thread.java:745)
"CoGroup (1/2)":
at 
org.

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-06-04 Thread zhijiang
Yes, it is the same case as multiple slots in TM. The source task and co-group 
task are still in the same TM in this case. I think you might enable slot 
sharing, so they are running still in the same slot in one TM.
BTW, the previous deadlock issue is already fixed on my side, and it is waiting 
for review atm. You could pick the code in PR to verfiy the results if you 
like. And the next release-1.8.1 might cover this fix as well.

Best,
Zhijiang
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 15:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Hello Zhijiang,
We have been seeing deadlocks with single slot TMs as well. Attaching the 
thread dump as requested. Looks similar to what was had with multi-slots TMs.
Thanks,
Rahul

From: zhijiang  
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?
If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
I already analyzed out this deadlock case based on the codes. FLINK-10491 has 
already solved on place to cause deadlock in SpillableSubpartition, but this is 
a different place to cause this issue.
When source task is trying to release subpartition memory, meanwhile another 
CoGroup task is submitted to trigger source task to release its memory, then it 
might cause deadlock.
I would create a jira ticket for this issue and think how to solve it soon. 
Currently if you still want to use the blocking type, the simple way to avoid 
this is to make only one slot in TM, then there never happen one task triggers 
another task 

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

2019-06-04 Thread zhijiang
The jira is https://issues.apache.org/jira/browse/FLINK-12544 and you could 
find the PR link in it.
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 18:19
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks


Thanks Zhijiang.
Can you point us to the JIRA for your fix?

Regards,
-Rahul

From: zhijiang  
Sent: Tuesday, June 4, 2019 1:26 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 
; Erai, Rahul [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] 
Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Yes, it is the same case as multiple slots in TM. The source task and co-group 
task are still in the same TM in this case. I think you might enable slot 
sharing, so they are running still in the same slot in one TM.
BTW, the previous deadlock issue is already fixed on my side, and it is waiting 
for review atm. You could pick the code in PR to verfiy the results if you 
like. And the next release-1.8.1 might cover this fix as well.
Best,
Zhijiang
--
From:Erai, Rahul 
Send Time:2019年6月4日(星期二) 15:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski ; 
"Narayanaswamy, Krishna" 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" 
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hello Zhijiang,
We have been seeing deadlocks with single slot TMs as well. Attaching the 
thread dump as requested. Looks similar to what was had with multi-slots TMs.
Thanks,
Rahul

From: zhijiang  
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
If it is still the case of multiple slots in one TaskManager, it is the same as 
before. But you said you already used the single slot per TaskManager, right?
If it is the case of single slot in TaskManager, you could attach the jstack 
when occurs next time, otherwise it is not needed.
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi Zhijiang,

I couldn’t get the jstack due to some constraints this time around. Will try 
and get them when it occurs next. But from the looks of it from the 
console/logs it appears to be the same as the 2 slot cases. DataSource 
finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck 
at DEPLOYING)

Thanks,
Krishna.

From: zhijiang  
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek ; Piotr Nowojski 
; Narayanaswamy, Krishna [Tech] 

Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Hi  Krishna,
Could you show me or attach the jstack for the single slot case? Or is it the 
same jstack as before?
Best,
Zhijiang
--
From:Narayanaswamy, Krishna 
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang ; Aljoscha Krettek 
; Piotr Nowojski 
Cc:Nico Kruber ; user@flink.apache.org 
; "Chan, Regina" ; "Erai, Rahul" 

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks

We started to run jobs using the single slotted task managers which seemed to 
be ok for the past couple of days, but today morning we seem to be seeing these 
deadlocks even with 1 slot. Is there something else we could try out?

Thanks,
Krishna.

From: Narayanaswamy, Krishna [Tech] 
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' ; Aljoscha Krettek 
; Piotr Nowojski 
Cc: Nico Kruber ; user@flink.apache.org; Chan, Regina 
[Tech] ; Erai, Rahul [Tech] 

Subject: RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered 
when running a large job > 10k tasks
Thanks Zhijiang. 

We will try these deadlock usecases with a single slot approach to see how they 
go. Will await the fix to start using more slots on the single TM.

Thanks,
Krishna.

From: zhijiang  
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha

Re: [DISCUSS] Deprecate previous Python APIs

2019-06-11 Thread zhijiang
It is reasonable as stephan explained. +1 from my side! 
--
From:Jeff Zhang 
Send Time:2019年6月11日(星期二) 22:11
To:Stephan Ewen 
Cc:user ; dev 
Subject:Re: [DISCUSS] Deprecate previous Python APIs
 +1

Stephan Ewen  于2019年6月11日周二 下午9:30写道:

> Hi all!
>
> I would suggest to deprecating the existing python APIs for DataSet and
> DataStream API with the 1.9 release.
>
> Background is that there is a new Python API under development.
> The new Python API is initially against the Table API. Flink 1.9 will
> support Table API programs without UDFs, 1.10 is planned to support UDFs.
> Future versions would support also the DataStream API.
>
> In the long term, Flink should have one Python API for DataStream and
> Table APIs. We should not maintain multiple different implementations and
> confuse users that way.
> Given that the existing Python APIs are a bit limited and not under active
> development, I would suggest to deprecate them in favor of the new API.
>
> Best,
> Stephan
>
>

-- 
Best Regards

Jeff Zhang



Re: Apache Flink - Disabling system metrics and collecting only specific metrics

2019-06-11 Thread zhijiang
Hi Mans,

AFAIK, we have no switch to disable general system metrics which are useful for 
monitoring status and performance tuning. Only some advanced system metrics 
could be confgiured to enable or not, and the default config is always 
disabled, so you do not need toconcern them.

Maybe you could implement a custom MetricReporter, and then only consentrate on 
your required application metrics in the method of  
`MetricReporter#notifyOfAddedMetric` to show them in backend.

Best,
Zhijiang


--
From:M Singh 
Send Time:2019年6月12日(星期三) 00:30
To:User 
Subject:Apache Flink - Disabling system metrics and collecting only specific 
metrics

Hi:

I am working on an application and need to collect application metrics. I would 
like to use Flink's metrics framework for my application metrics.  From Flink's 
documentation 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#system-metrics),
 it looks like Flink collects system metrics by default but I don't need those.

Is there any way to configure metrics to 

1. disable system metrics collection, 
2. collect only specific metrics.

If there is any documentation/configuration that I might have missed, please 
let me know.

Thanks

Mans





Re: java.io.FileNotFoundException in implementing exactly once

2019-06-11 Thread zhijiang
For exactly-once mode before flink-1.5, it needs the temp dir for spilling the 
blocking buffers during checkpoint.

The temp dir is configured via `io.tmp.dirs` and the default value is 
`java.io.tmpdir`. In your case, your temp dir prefix with `/tmp/` has some 
problems (might be deleted), and you could double check this dir for the issue.
In addition I suggestt you upgrading the flink version because flink-1.3.3 is 
too old. After upgrading to flink-1.5 above, you do not need to consider this 
issue, because the exactly-once mode would not spill data to disk any more.

Best,
Zhijiang 
--
From:syed 
Send Time:2019年6月12日(星期三) 10:16
To:user 
Subject:java.io.FileNotFoundException in implementing exactly once

Hi;
I am trying to run the standard WordCount application under exactly once and
at-least once processing guarantees, respectively. I successfully run the
at-least once guarantee, however which running the exactly once guarantee, I
face the following exception
*Root exception:*
java.io.FileNotFoundException:
/tmp/flink-io-7a8947d4-c75c-4165-85a1-fb727dd98791/ff99c56a01707c5a610ad250e77e71be4c4ed762b6294d73cf9d780e0d422444.0.buffer
(No such file or directory)
 at java.io.RandomAccessFile.open0(Native Method)
 at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
 at java.io.RandomAccessFile.(RandomAccessFile.java:243)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120)
 at
org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:147)
 at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:128)
 at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:56)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:234)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
 at java.lang.Thread.run(Thread.java:748)

*Keyed Aggregation -> Sink: Unnamed (1/1)*
java.io.FileNotFoundException:
/tmp/flink-io-7a8947d4-c75c-4165-85a1-fb727dd98791/ff99c56a01707c5a610ad250e77e71be4c4ed762b6294d73cf9d780e0d422444.0.buffer
(No such file or directory)
 at java.io.RandomAccessFile.open0(Native Method)
 at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
 at java.io.RandomAccessFile.(RandomAccessFile.java:243)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.createSpillingChannel(BufferSpiller.java:259)
 at
org.apache.flink.streaming.runtime.io.BufferSpiller.(BufferSpiller.java:120)
 at
org.apache.flink.streaming.runtime.io.BarrierBuffer.(BarrierBuffer.java:147)
 at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.(StreamInputProcessor.java:128)
 at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.init(OneInputStreamTask.java:56)
 at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:234)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
 at java.lang.Thread.run(Thread.java:748)

I am using Apache Kafka to keep data source available for checkpoints, and
using flink 1.3.3.

I face the same exception either I explicitly use Exactly once
[/setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE/)] or only use [
/enableCheckpointing(1000)/]
Kind regards;
Syed




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: A little doubt about the blog A Deep To Flink's NetworkStack

2019-06-16 Thread zhijiang
Hi Aitozi,

The current connection sharing is only suitbale for the same jobVertex between 
TaskMangaers via ConnectionIndex. I ever tried to remove this limit by ticket 
[1] but have not reached an agreement on this.
You could watch this jira if interested.


[1] https://issues.apache.org/jira/browse/FLINK-10462

Best,
Zhijiang
--
From:aitozi 
Send Time:2019年6月16日(星期日) 22:19
To:user 
Subject:A little doubt about the blog A Deep To Flink's NetworkStack

Hi, community

I read this blog  A Deep To Flink's NetworkStack
<https://flink.apache.org/2019/06/05/flink-network-stack.html>  

In the chapter *Physical Transport*, it says that the /if different subtasks
of the same task are scheduled onto the same TaskManager, their network
connections towards the same TaskManagers will be multiplexed and share a
single TCP channel for reduced resource usage./.

But I check the code I think each taskmanager have a NettyServer and a
NettyClient which work with a channel. So i think the tcp channel is shared
between the taskmanager remote connection no matter there is different
subtasks of the same task are scheduled onto the same TaskManager or not.

Please correct me if my thoughts was wrong.

Thanks
Aitozi



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-19 Thread zhijiang
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

@Till have you see something like this before? Despite all source tasks 
reaching a terminal state on a TM (FAILED) it does not send updates to 
the JM for all of them, but only a single one.

On 18/06/2019 12:14, Joshua Fan wrote:
> Hi All,
> There is a topology of 3 operator, such as, source, parser, and 
> persist. Occasionally, 5 subtasks of the source encounters exception 
> and turns to failed, at the same time, one subtask of the parser runs 
> into exception and turns to failed too. The jobmaster gets a message 
> of the parser's failed. The jobmaster then try to cancel all the 
> subtask, most of the subtasks of the three operator turns to canceled 
> except the 5 subtasks of the source, because the state of the 5 ones 
> is already FAILED before jobmaster try to cancel it. Then the 
> jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state.
>
> The job run on a flink 1.7 cluster on yarn, and there is only one tm 
> with 10 slots.
>
> The attached files contains a jm log , tm log and the ui picture.
>
> The exception timestamp is about 2019-06-16 13:42:28.
>
> Yours
> Joshua



Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-21 Thread zhijiang
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still active after it throwed a oom and was called to cancel . I attached the 
jstack file in this email.

Yours sincerely
Joshua
On Wed, Jun 19, 2019 at 4:40 PM zhijiang  wrote:
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

@Till have you see something like this before? Despite all source tasks 
reaching a terminal state on a TM (FAILED) it does not send updates to 
the JM for all of them, but only a single one.

On 18/06/2019 12:14, Joshua Fan wrote:
> Hi All,
> There is a topology of 3 operator, such as, source, parser, and 
> persist. Occasionally, 5 subtasks of the source encounters exception 
> and turns to failed, at the same time, one subtask of the parser runs 
> into exception and turns to failed too. The jobmaster gets a message 
> of the parser's failed. The jobmaster then try to cancel all the 
> subtask, most of the subtasks of the three operator turns to canceled 
> except the 5 subtasks of the source, because the state of the 5 ones 
> is already FAILED before jobmaster try to cancel it. Then the 
> jobmaster can not reach a final state but keeps in  Failing state 
> meanwhile the subtask of the source kees in canceling state.
>
> The job run on a flink 1.7 cluster on yarn, and there is only one tm 
> with 10 slots.
>
> The attached files contains a jm log , tm log and the ui picture.
>
> The exception timestamp is about 2019-06-16 13:42:28.
>
> Yours
> Joshua




Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-21 Thread zhijiang
Thanks for the reminding @Chesnay Schepler .

I just looked throught the related logs. Actually all the five "Source: 
ServiceLog" tasks are not in terminal state on JM view, the relevant processes 
are as follows:

1. The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
2. The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
3. When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 

For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.

I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月21日(星期五) 16:34
To:zhijiang ; Joshua Fan 
Cc:user ; Till Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

The logs are attached to the initial mail.

 Echoing my thoughts from earlier: from the logs it looks as if the TM never 
even submits the terminal state RPC calls for several tasks to the JM.

 On 21/06/2019 10:30, zhijiang wrote:
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang 

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still active after it throwed a oom and was called to cancel . I attached the 
jstack file in this email.

Yours sincerely
Joshua  
On Wed, Jun 19, 2019 at 4:40 PM zhijiang  wrote:
As long as one task is in canceling state, then the job status might be still 
in canceling state.

@Joshua Do you confirm all of the tasks in topology were already in terminal 
state such as failed or canceled?

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月19日(星期三) 16:32
To:Joshua Fan ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

 @Till have you see something like this before? Despite all source tasks 
 reaching a terminal state on a TM (FAILED) it does not send updates to 
 the JM for all of them, but only a single one.

 On 18/06/2019 12:14, Joshua Fan wrote:
 > Hi All,
 > There is a topology of 3 operator, such as, source, parser, and 
 > persist. 

Re: Maybe a flink bug. Job keeps in FAILING state

2019-06-24 Thread zhijiang
Thanks for opening this ticket and I would watch it.

Flink does not handle OOM issue specially. I remembered we ever discussed the 
similar issue before but forgot the conclusion then or have other concerns for 
it.
I am not sure whether it is worth to fix atm, maybe Till or Chesnay could give 
a final decision.

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月25日(星期二) 11:10
To:zhijiang 
Cc:Chesnay Schepler ; user ; Till 
Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

Hi Zhijiang

Thank you for your analysis. I agree with it. The solution may be to let tm 
exit like you mentioned when any type of oom occurs, because the flink has no 
control on a tm when a oom occurs.

I fired a jira before, https://issues.apache.org/jira/browse/FLINK-12889.

Don't know it is worth to fix.

Thank you all.

Yours sincerely
Joshua
On Fri, Jun 21, 2019 at 5:32 PM zhijiang  wrote:
Thanks for the reminding @Chesnay Schepler .

I just looked throught the related logs. Actually all the five "Source: 
ServiceLog" tasks are not in terminal state on JM view, the relevant processes 
are as follows:

1. The checkpoint in task causes OOM issue which would call 
`Task#failExternally` as a result, we could see the log "Attempting to fail 
task externally" in tm.
2. The source task would transform state from RUNNING to FAILED and then starts 
a canceler thread for canceling task, we could see log "Triggering cancellation 
of task" in tm.
3. When JM starts to cancel the source tasks, the rpc call 
`Task#cancelExecution` would find the task was already in FAILED state as above 
step 2, we could see log "Attempting to cancel task" in tm.

At last all the five source tasks are not in terminal states from jm log, I 
guess the step 2 might not create canceler thread successfully, because the 
root failover was caused by OOM during creating native thread in step1, so it 
might exist possibilities that createing canceler thread is not successful as 
well in OOM case which is unstable. If so, the source task would not been 
interrupted at all, then it would not report to JM as well, but the state is 
already changed to FAILED before. 

For the other vertex tasks, it does not trigger `Task#failExternally` in step 
1, and only receives the cancel rpc from JM in step 3. And I guess at this time 
later than the source period, the canceler thread could be created succesfully 
after some GCs, then these tasks could be canceled as reported to JM side.

I think the key problem is under OOM case some behaviors are not within 
expectations, so it might bring problems. Maybe we should handle OOM error in 
extreme way like making TM exit to solve the potential issue.

Best,
Zhijiang
--
From:Chesnay Schepler 
Send Time:2019年6月21日(星期五) 16:34
To:zhijiang ; Joshua Fan 
Cc:user ; Till Rohrmann 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

The logs are attached to the initial mail.

 Echoing my thoughts from earlier: from the logs it looks as if the TM never 
even submits the terminal state RPC calls for several tasks to the JM.

 On 21/06/2019 10:30, zhijiang wrote:
Hi Joshua,

If the tasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) were really in 
CANCELED state on TM side, but in CANCELING state on JM side, then it might 
indicates the terminal state RPC was not received by JM. I am not sure whether 
the OOM would cause this issue happen resulting in unexpected behavior.

In addition, you mentioned these tasks are still active after OOM and was 
called to cancel, so I am not sure what is the specific periods for your 
attached TM stack. I think it might provide help if you could provide 
corresponding TM log and JM log. 
From TM log it is easy to check the task final state. 

Best,
Zhijiang
--
From:Joshua Fan 
Send Time:2019年6月20日(星期四) 11:55
To:zhijiang 
Cc:user ; Till Rohrmann ; Chesnay 
Schepler 
Subject:Re: Maybe a flink bug. Job keeps in FAILING state

zhijiang 

I did not capture the job ui, the topology is in FAILING state, but the 
persistentbolt subtasks as can be seen in the picture attached in first mail 
was all canceled, and the parsebolt subtasks as described before had one 
subtask FAILED, other subtasks CANCELED, but the source subtasks had one 
subtask(subtask 4/5) CANCELED, and other subtasks(subtask 1/5,subtask 
2/5,subtask 3/5,subtask 5/5) CANCELING,  not in a terminal state.

The subtask status described above is in jm view, but in tm view, all of the 
source subtask was in FAILED, do not know why jm was not notify about this.

As all of the failed status was triggered by a oom by the subtask can not 
create native thread when checkpointing, I also dumped the stack of the jvm, it 
shows the four subtasks(subtask 1/5,subtask 2/5,subtask 3/5,subtask 5/5) are 
still

Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread zhijiang
Congratulations Rong!

Best,
Zhijiang
--
From:Kurt Young 
Send Time:2019年7月11日(星期四) 22:54
To:Kostas Kloudas 
Cc:Jark Wu ; Fabian Hueske ; dev 
; user 
Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer

Congratulations Rong!

Best,
Kurt


On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas  wrote:
Congratulations Rong!
On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
Congratulations Rong Rong! 
Welcome on board!
On Thu, 11 Jul 2019 at 22:25, Fabian Hueske  wrote:
Hi everyone,

I'm very happy to announce that Rong Rong accepted the offer of the Flink PMC 
to become a committer of the Flink project.

Rong has been contributing to Flink for many years, mainly working on SQL and 
Yarn security features. He's also frequently helping out on the user@f.a.o 
mailing lists.

Congratulations Rong!

Best, Fabian 
(on behalf of the Flink PMC)



回复:DataSet with Multiple reduce Actions

2018-06-27 Thread Zhijiang(wangzhijiang999)
Hi Osh,

As I know, currently one dataset source can not be consumed by several 
different vertexs and from the API you can not construct the topology for your 
request.
I think your way to merge different reduce functions into one UDF is feasible. 
Maybe someone has better solution. :)

zhijiang


--
发件人:Osian Hedd Hughes 
发送时间:2018年6月28日(星期四) 00:35
收件人:user 
主 题:DataSet with Multiple reduce Actions

Hi,

I am new to Flink, and I'd like to firstly use it to perform some in memory 
aggregation in batch mode (in some months this will be migrated to permanent 
streaming, hence the choice of Flink).

For this, I can successfully create the complex key that I require using 
KeySelector & returning a hash of the set of fields to "groupBy".
I can also get the data from file/db, but now I want to be able to perform many 
different reduce functions on different fields (not hardcoded, but read from 
configuration).
What I'd like to know, is if this is possible out of the box?
From my research, it seems that only a single reduce function can be applied to 
a DataSet.
The only way I found up to now, was to create a single reducer which is a 
container for all of the reduce functions I want to apply to my data record and 
simply loop through them to apply them to each record.

Is this recommended? or am I missing some basics here?

Many thanks for any advice,
Osh



回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-02 Thread Zhijiang(wangzhijiang999)
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang


--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 (...) 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.jav

回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-02 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

I think you can check the job manager log to find which task failed at first, 
and then trace the task manager log containing the failed task to find the 
initial reason.
The failed task will trigger canceling all the other tasks, and during 
canceling process, the blocked task that is waiting for output buffer can not 
be interrupted by the
canceler thread which is shown in your description. So I think the cancel 
process is not the key point and is in expectation. Maybe it did not cause OOM 
at all. 
If the taskduring canceling, the task manager process will be exited finally to 
trigger restarting the job.

Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong 
to the unresponsive task, that is why we suspect that at some point it did not 
have enough memory to serialize the message and it blocked. I've also found 
that when it hanged several output buffers were full (see attached image 
buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the 
output buffers previous to the blocked operator. I'll have to continue 
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
 wrote:
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang

--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutpu

回复:Limiting in flight data

2018-07-05 Thread Zhijiang(wangzhijiang999)
Hi Vishal,

Before Flink-1.5.0, the sender tries best to send data on the network until the 
wire is filled with data. From Flink-1.5.0 the network flow control is improved 
by credit-based idea. That means the sender transfers data based on how many 
buffers avaiable on receiver side, so there will be no data accumulated on the 
wire. From this point, the in-flighting data is less than before.

Also you can further limit the in-flighting data by controling the number of 
credits on receiver side, and the related parameters are 
taskmanager.network.memory.buffers-per-channel and 
taskmanager.network.memory.floating-buffers-per-gate. 

If you have other questions about them, let me know then i can explain for you.

Zhijiang
--
发件人:Vishal Santoshi 
发送时间:2018年7月5日(星期四) 22:28
收件人:user 
主 题:Limiting in flight data

"Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely 
you will be able to limit the “in flight” data, by controlling the number of 
assigned credits per channel/input gate. Even without any configuring Flink 
1.5.0 will out of the box buffer less data, thus mitigating the problem."

I read this in another email chain. The docs ( may be you can point me to them 
) are not very clear on how to do the above. Any pointers will be appreciated.

Thanks much.



回复:Handling back pressure in Flink.

2018-07-05 Thread Zhijiang(wangzhijiang999)
Hi Mich,

From flink-1.5.0 the network flow control is improved by credit-based mechanism 
whichs handles backpressure better than before. The producer sends data based 
on the number of available buffers(credit) onconsumer side. If processing time 
on consumer side is slower than producing time on producer side, the data will 
be cached on outqueue and inqueue memories of both side which may trigger back 
pressure the producer side. You can increase the number of credits on consumer 
side to relieve back pressure. Eventhough the back pressure happens, the 
application is still stable (will not cause OOM). I think you should not worry 
about that. Normally it is better to consider TPS of both sides and set the 
proper paralellism to avoid back pressure to some extent.

Zhijiang
--
发件人:Mich Talebzadeh 
发送时间:2018年7月4日(星期三) 20:40
收件人:user 
主 题:Handling back pressure in Flink.

Hi,

In spark one can handle back pressure by setting the spark conf parameter:

sparkConf.set("spark.streaming.backpressure.enabled","true")

With backpressure you make Spark Streaming application stable, i.e. receives 
data only as fast as it can process it. In general one needs to ensure that 
your microbatching processing time is less that your batch interval, i.e the 
rate that your producer sends data into Kafka. For example this is shown in 
Spark GUI below for batch interval = 2 seconds



Is there such procedure in Flink please?

Thanks


Dr Mich Talebzadeh

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk.Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 




image.png
Description: Binary data


回复:Limiting in flight data

2018-07-08 Thread Zhijiang(wangzhijiang999)
The config you mentioned is not operator level, but can be setted at job level 
currently I think. The operator level needs the API support but seems more 
reasonable.

There exists "inPoolUsage" and "outPoolUsage" metrcis to indicate backpreesure 
to some extent. If the percentages of these metrics are both 100% between 
producer and consumer, the producer will be blocked (backpressure) by the 
consumer for a while.

Also there exists latency marker from source to sink in the whole topology to 
sample latency. Maybe you can resort these metrics for some helps.


--
发件人:Vishal Santoshi 
发送时间:2018年7月6日(星期五) 22:05
收件人:Zhijiang(wangzhijiang999) 
抄 送:user 
主 题:Re: Limiting in flight data

Further if there is are metrics that allows us to chart delays per pipe on n/w 
buffers, that would be immensely helpful. 

On Fri, Jul 6, 2018 at 10:02 AM, Vishal Santoshi  
wrote:
Awesome, thank you for pointing that out. We have seen stability on pipes where 
previously throttling the source ( rateLimiter ) was the only way out.

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L291

This though seems to be a cluster wide setting. Is it possible to do this at an 
operator level ?  Does this work with the pipe level configuration per job ( or 
has that been deprecated ) 

On Thu, Jul 5, 2018 at 11:16 PM, Zhijiang(wangzhijiang999) 
 wrote:
Hi Vishal,

Before Flink-1.5.0, the sender tries best to send data on the network until the 
wire is filled with data. From Flink-1.5.0 the network flow control is improved 
by credit-based idea. That means the sender transfers data based on how many 
buffers avaiable on receiver side, so there will be no data accumulated on the 
wire. From this point, the in-flighting data is less than before.

Also you can further limit the in-flighting data by controling the number of 
credits on receiver side, and the related parameters are 
taskmanager.network.memory.buffers-per-channel and 
taskmanager.network.memory.floating-buffers-per-gate. 

If you have other questions about them, let me know then i can explain for you.

Zhijiang
--
发件人:Vishal Santoshi 
发送时间:2018年7月5日(星期四) 22:28
收件人:user 
主 题:Limiting in flight data

"Yes, Flink 1.5.0 will come with better tools to handle this problem. Namely 
you will be able to limit the “in flight” data, by controlling the number of 
assigned credits per channel/input gate. Even without any configuring Flink 
1.5.0 will out of the box buffer less data, thus mitigating the problem."

I read this in another email chain. The docs ( may be you can point me to them 
) are not very clear on how to do the above. Any pointers will be appreciated.

Thanks much.






回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-13 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

I thought the failed task triggers cancel process before, now I am clear that 
you cancel the task when it stops processing data.
I think you can jstack the process to find where task thread is blocked instead 
of canceling it, then we may find some hints.

In addition, the following stack "DataOutputSerializer.resize" indicates the 
task is serializing the record and there will be overhead byte buffers in the 
serializer for copying data temporarily. And if your record is too large, it 
may cause OOM in this process and this overhead memory is not managed by flink 
framework. Also you can monitor the gc status to check the full gc delay.

Best,
Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月13日(星期五) 16:22
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Hi Zhijiang,

The problem is that no other task failed first. We have a task that sometimes 
just stops processing data, and when we cancel it, we see the logs messages  
saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is stuck in 
method: 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
 org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why it 
stops processing data. I don;t see any increase in memory use in the heap (I 
guess because these buffers are managed by Flink) so I'm not sure if that is 
really the problem.

Gerard
On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) 
 wrote:
Hi Gerard,

I think you can check the job manager log to find which task failed at first, 
and then trace the task manager log containing the failed task to find the 
initial reason.
The failed task will trigger canceling all the other tasks, and during 
canceling process, the blocked task that is waiting for output buffer can not 
be interrupted by the
canceler thread which is shown in your description. So I think the cancel 
process is not the key point and is in expectation. Maybe it did not cause OOM 
at all. 
If the taskduring canceling, the task manager process will be exited finally to 
trigger restarting the job.

Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong 
to the unresponsive task, that is why we suspect that at some point it did not 
have enough memory to serialize the message and it blocked. I've also found 
that when it hanged several output buffers were full (see attached image 
buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the 
output buffers previous to the blocked operator. I'll have to continue 
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
 wrote:
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang

--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.

回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-17 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

From the jstack you provided, the task is serializing the output record and 
during this process it will not process the input data any more. 
It can not indicate out of memory issue from this stack. And if the output 
buffer is exhausted, the task will be blocked on requestBufferBlocking process.

I think the key point is your output record is too large and complicated 
structure, because every field and collection in this complicated class will be 
traversed to serialize, then it will cost much time and CPU usage. Furthermore, 
the checkpoint can not be done because of waiting for lock which is also 
occupied by task output process.

As you mentioned, it makes sense to check the data structure of the output 
record and reduces the size or make it lightweight to handle. 

Best,

Zhijiang


--
发件人:Gerard Garcia 
发送时间:2018年7月17日(星期二) 21:53
收件人:piotr 
抄 送:fhueske ; wangzhijiang999 ; 
user ; nico 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record 
(probably too big, we have already started working to reduce its size) which 
consists of several case classes which have (among others) fields of type 
String. 

I attach a CPU profile of the thread stuck serializing. I also attach the 
memory and GC telemetry that the profiler shows (which maybe is more 
informative than the one recorded from the JVM metrics). Only one node was 
actually "doing something" all others had CPU usage near zero.

The task is at the same time trying to perform a checkpoint but keeps failing. 
Would it make sense that the problem is that there is not enough memory 
available to perform the checkpoint so all operators are stuck waiting for it 
to finish, and at the same time, the operator stuck serializing is keeping all 
the memory so neither it nor the checkpoint can advance? 

I realized that I don't have a minimum pause between checkpoints so it is 
continuously trying. Maybe I can reduce the checkpoint timeout from the 10m 
default and introduce a minimum pause (e.g. 5m timeout and 5m minimum pause) 
and this way I could break the deadlock.

Gerard


On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski  wrote:
Hi,

Thanks for the additional data. Just to make sure, are you using Flink 1.5.0?

There are a couple of threads that seams to be looping in serialisation, while 
others are blocked and either waiting for new data or waiting for some one to 
consume some data. Could you debug or CPU profile the code, in particularly 
focusing on threads with stack trace as below [1]. Aren’t you trying to 
serialise some gigantic String?

Piotrek

[1]:

"(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x7f52584d2800 nid=0x6819 
runnable [0x7f451a843000]
   java.lang.Thread.State: RUNNABLE
 at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
 at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
 at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
 at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(

回复:Network PartitionNotFoundException when run on multi nodes

2018-07-22 Thread Zhijiang(wangzhijiang999)
Hi Steffen,

This exception indicates that when the downstream task requests partition from 
the upstream task, the upstream task has not initialized to register its result 
partition.
In this case, the downstream task will inquire the state from job manager, and 
then retry to request partition from the upstream until the maximum retry 
timeout. You can increase the parameter of 
"taskmanager.network.request-backoff.max" to check whether it works, the 
default value is 10s.

BTW, you should check why the upstream registers its result partition delayed, 
maybe the upstream TaskManager received the task deployment delayed from 
JobManager, or some operations in upstream task initialization unexpectly cost 
more time before registering result partition. 

Best,
Zhijiang
--
发件人:Steffen Wohlers 
发送时间:2018年7月22日(星期日) 22:22
收件人:user 
主 题:Network PartitionNotFoundException when run on multi nodes

Hi all,

I have some problems when running my application on more than one Task Manager.

setup:
node1: Job Manager, Task Manager
node2: Task Manager

I can run my program successfully on each node alone when I stop the other Task 
Manager.
But when I start both and set parallelism = 2, every time I got the following 
exception (after 30 seconds):

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: 
Partition 6372b5f434d55e987ea179d6f6b488fe@e389ca50a2c2cf776b90268f987a6546 not 
found.
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:273)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:182)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:400)
at 
org.apache.flink.runtime.taskmanager.Task.onPartitionStateUpdate(Task.java:1294)
at 
org.apache.flink.runtime.taskmanager.Task.lambda$triggerPartitionProducerStateCheck$0(Task.java:1151)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



It seems the problem occurs when a subtask is linked to both Task Manager.

Does anybody know how I can make it work?

Thanks,
Steffen




回复:Kryo Serialization Issue

2018-08-28 Thread Zhijiang(wangzhijiang999)
Hi,

How do you reduce the speed to avoid this issue? Do you mean reducing the 
parallelism of source or downstream tasks? 
As I know, data buffering is managed by flink internal buffer pool and memory 
manager, so it will not cause OOM issue.
I just wonder the OOM may be caused by temporary byte buffers in record 
serializers. If the record size is large and the downstream parallelism is 
large, it may cause OOM issue in serialization.
Could you show the stack of OOM part? If it is this case,  the following [1] 
can solve it and it is working in progress.

Zhijiang

[1] https://issues.apache.org/jira/browse/FLINK-9913
--
发件人:Darshan Singh 
发送时间:2018年8月28日(星期二) 00:16
收件人:walterddr 
抄 送:user 
主 题:Re: Kryo Serialization Issue

Thanks,  We ran into differnet errors and then realized it was OOM issue which 
was causing different parts to be failed. 
Flink was buffering too much data as we were reading too fast from source. 
Reducing the speed fixed the issue.

However, I am curious how to achieve the same with S3 apart from limiting the 
number of files to read at same time.

Thanks

On Sun, Aug 26, 2018 at 5:32 PM Rong Rong  wrote:
This seems to be irrelevant to the issue for KyroSerializer in recent 
discussions [1]. which has been fixed in 1.4.3, 1.5.0 and 1.6.0.
On a quick glance, this might have been a corrupted message in your decoding, 
for example a malformed JSON string.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-8836

On Wed, Aug 22, 2018 at 8:41 AM Darshan Singh  wrote:
Hi,

I am using a map function on a data stream which has 1 column i.e. a json 
string. Map function simply uses Jackson mapper and convert the String to 
ObjectNode and also assign key based on one of the value in Object node.

The code seems to work fine for 2-3 minutes as expected and then suddenly it 
fails with below error. I looked at the mailing list and most of the issues 
mentioned that it was fixed in 1.5.0 and I am using 1.6.0 so not sure what 
needs to do.

Just wanted to know if we will need to write our own Serializer for ObjectNode 
to fix this issue or there is some setting we are missing.

Thanks

ava.lang.IndexOutOfBoundsException: Index: 49, Size: 0
 at java.util.ArrayList.rangeCheck(ArrayList.java:657)
 at java.util.ArrayList.get(ArrayList.java:433)
 at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
 at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:315)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:207)
 at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
 at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.lang.Thread.run(Thread.java:748)





回复:Backpressure? for Batches

2018-08-29 Thread Zhijiang(wangzhijiang999)
The backpressure is caused when downstream and upstream are running 
concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the 
backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, 
that means the downstream will be scheduled after upstream finishes, so the 
slower downstream will not block upstream running, then the backpressure may 
not exist in this case.

Best,
Zhijiang
--
发件人:Darshan Singh 
发送时间:2018年8月29日(星期三) 16:20
收件人:user 
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could 
face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches 
I was seeing that it was just showing the tasks status and no status like "OK" 
etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we 
reduce this especially if I am reading from hdfs.

Thanks



回复:Backpressure? for Batches

2018-08-29 Thread Zhijiang(wangzhijiang999)
chesnay is right. For batch job there are two ways for notifying consumable. 
One is the first record emitted by upstream and the other is the upstream 
finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the 
upstreams until source node. But it will not cause OOM for caching in-flight 
buffers normally because they are managed by framework and will not exceed 
unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow 
node(groupby in your case) or decrease the parallelism of fast node(source in 
your case).

Best,
Zhijiang
--
发件人:Darshan Singh 
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay 
抄 送:wangzhijiang999 ; user 
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from 
hdfs than my say map or group by can consume? Is there some sort of 
configuration which says read only 1 rows and then stop and then reread 
etc. Otherwise source will keep on sending the data or keeping in some sort of 
buffers and will be OOM. Or setting different parallelism while reading is the 
way to handle this?

Thanks
On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler  wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is 
data to be consumed, i.e. one the first record was emitted by the previous 
operator. As such back-pressure exists in batch just like in streaming.

 On 29.08.2018 11:39, Darshan Singh wrote:
Thanks, 

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of 
makes sense. From your answer I understood that flink will do first number 1 
once that is completed it will do map(or grouping as well) and then grouping 
and finally the write. Thus, there should be 1 task running at 1 time. This 
doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the 
deserialise part or maybe reading from hdfs itself? 

Thanks  
On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) 
 wrote:
The backpressure is caused when downstream and upstream are running 
concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the 
backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, 
that means the downstream will be scheduled after upstream finishes, so the 
slower downstream will not block upstream running, then the backpressure may 
not exist in this case.

Best,
Zhijiang
--
发件人:Darshan Singh 
发送时间:2018年8月29日(星期三) 16:20
收件人:user 
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could 
face the same with the batches as well. 

In theory it should be possible. But in Web UI for backpressure tab for batches 
I was seeing that it was just showing the tasks status and no status like "OK" 
etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we 
reduce this especially if I am reading from hdfs.

Thanks  





回复:Backpressure? for Batches

2018-08-29 Thread Zhijiang(wangzhijiang999)
You can check the log to show the related stack in OOM, maybe we can confirm 
some reasons.
Or you can dump the heap to analyze the memory usages after OOM.

Best,
Zhijiang
--
发件人:Darshan Singh 
发送时间:2018年8月29日(星期三) 19:22
收件人:wangzhijiang999 
抄 送:chesnay ; user 
主 题:Re: Backpressure? for Batches

Thanks,

I thought either Group by is causing the OOM but it is mentioned that sort will 
be spilled to disk so that there is no way for that to cause the OOM. So I was 
looking maybe due to back pressure some of data read from hdfs is kept in 
memory as it is not consumed and that is causing OOM.So it seems this is not 
possible as well so need to relook what could be causing the OOM.

Thanks
On Wed, Aug 29, 2018 at 12:41 PM Zhijiang(wangzhijiang999) 
 wrote:
chesnay is right. For batch job there are two ways for notifying consumable. 
One is the first record emitted by upstream and the other is the upstream 
finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the 
upstreams until source node. But it will not cause OOM for caching in-flight 
buffers normally because they are managed by framework and will not exceed 
unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow 
node(groupby in your case) or decrease the parallelism of fast node(source in 
your case).

Best,
Zhijiang
--
发件人:Darshan Singh 
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay 
抄 送:wangzhijiang999 ; user 
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from 
hdfs than my say map or group by can consume? Is there some sort of 
configuration which says read only 1 rows and then stop and then reread 
etc. Otherwise source will keep on sending the data or keeping in some sort of 
buffers and will be OOM. Or setting different parallelism while reading is the 
way to handle this?

Thanks
On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler  wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is 
data to be consumed, i.e. one the first record was emitted by the previous 
operator. As such back-pressure exists in batch just like in streaming.

 On 29.08.2018 11:39, Darshan Singh wrote:
Thanks, 

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of 
makes sense. From your answer I understood that flink will do first number 1 
once that is completed it will do map(or grouping as well) and then grouping 
and finally the write. Thus, there should be 1 task running at 1 time. This 
doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the 
deserialise part or maybe reading from hdfs itself? 

Thanks  
On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) 
 wrote:
The backpressure is caused when downstream and upstream are running 
concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the 
backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, 
that means the downstream will be scheduled after upstream finishes, so the 
slower downstream will not block upstream running, then the backpressure may 
not exist in this case.

Best,
Zhijiang
--
发件人:Darshan Singh 
发送时间:2018年8月29日(星期三) 16:20
收件人:user 
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could 
face the same with the batches as well. 

In theory it should be possible. But in Web UI for backpressure tab for batches 
I was seeing that it was just showing the tasks status and no status like "OK" 
etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we 
reduce this especially if I am reading from hdfs.

Thanks  






回复:Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

2018-09-07 Thread Zhijiang(wangzhijiang999)
Hi,

I think the problem in the attched image is not the root cause of your job 
failure. It must exist other task or TaskManager failures, then all the related 
tasks will be cancelled by job manager, and the problem in attched image is 
just caused by task cancelled.

You can review the log of job manager to check whether there are any failures 
to cause failing the whole job.
 FYI, the task manager may be killed by yarn because of memory exceed. You 
mentioned the job fails in half an hour after starts, so I guess it exits the 
possibility that the task manager is killed by yarn.

Best,
Zhijiang
--
发件人:杨力 
发送时间:2018年9月7日(星期五) 13:09
收件人:user 
主 题:Flink 1.6 Job fails with IllegalStateException: Buffer pool is destroyed.

Hi all,
I am encountering a weird problem when running flink 1.6 in yarn per-job 
clusters.
The job fails in about half an hour after it starts. Related logs is attached 
as an imange.

This piece of log comes from one of the taskmanagers. There are not any other 
related log lines.
No ERROR-level logs. The job just runs for tens of minutes without printing any 
logs
and suddenly throws this exception.

It is reproducable in my production environment, but not in my test environment.
The 'Buffer pool is destroed' exception is always thrown while emitting latency 
marker.



  1   2   >