Re: ClasNotFound when submitting job from command line

2016-04-20 Thread Robert Metzger
Hi Flavio,

in which class are you calling Class.forName()? Is the class where the
Class.forName() call is loaded from the user jar or is it a class from the
Flink distribution?
I'm asking because Class.forName() is using the classloader of the class
where the call is located. So if the class has been loaded from the system
class loader, it can not access classes from the user jar.



On Tue, Apr 19, 2016 at 6:13 PM, Flavio Pompermaier 
wrote:

> I use maven to generate the shaded jar (and the classes are inside it) but
> when the job starts it can load those classes using Class.forName()
> (required to instantiate the JDBC connections).
> I think it's probably a problem related to class loading of Flink
>
> On Tue, Apr 19, 2016 at 6:02 PM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> In your pom.xml add the maven.plugins like this, and you will have to add
>> all the dependent artifacts, this works for me, if you fire mvn clean
>> compile package, the created jar is a fat jar.
>>
>> 
>>
>> org.apache.maven.plugins
>>
>> maven-dependency-plugin
>>
>> 2.9
>>
>> 
>>
>> 
>>
>> unpack
>>
>> 
>>
>> prepare-package
>>
>> 
>>
>> unpack
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> org.apache.flink
>>
>>
>> flink-connector-kafka-0.8_${scala.version}
>>
>> ${flink.version}
>>
>> jar
>>
>> false
>>
>>
>> ${project.build.directory}/classes
>>
>>
>> org/apache/flink/**
>>
>> 
>>
>>...
>>
>>   ...
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> On Tue, Apr 19, 2016 at 9:10 PM, Flavio Pompermaier > > wrote:
>>
>>> Hi to all,
>>>
>>> I just tied to dubmit my application to the Flink cluster (1.0.1) but I
>>> get ClassNotFound exceptions for classes inside my shaded jar (like
>>> oracle.jdbc.OracleDriver or org.apache.commons.pool2.PooledObjectFactory).
>>> Those classes are in the shaded jar but aren't found.
>>> If I put the jars in the flink's lib dir (for every node of the cluster)
>>> things work.
>>> How can I solve that?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>


Re: Leader not found

2016-04-20 Thread Robert Metzger
Hi,
I just tried it with Kafka 0.8.2.0 and 0.8.2.1 and for both versions
everything worked fine.
How many partitions does your topic have?

Can you send me the full logs of the Kafka consumer?

On Tue, Apr 19, 2016 at 6:05 PM, Balaji Rajagopalan <
balaji.rajagopa...@olacabs.com> wrote:

> Flink version : 1.0.0
> Kafka version : 0.8.2.1
>
> Try to use a topic which has no message posted to it, at the time flink
> starts.
>
> On Tue, Apr 19, 2016 at 5:41 PM, Robert Metzger 
> wrote:
>
>> Can you provide me with the exact Flink and Kafka versions you are using
>> and the steps to reproduce the issue?
>>
>> On Tue, Apr 19, 2016 at 2:06 PM, Balaji Rajagopalan <
>> balaji.rajagopa...@olacabs.com> wrote:
>>
>>> It does not seem to fully work if there is no data in the kafka stream,
>>> the flink application emits this error and bails, could this be missed use
>>> case in the fix.
>>>
>>> On Tue, Apr 19, 2016 at 3:58 PM, Robert Metzger 
>>> wrote:
>>>
 Hi,

 I'm sorry, the documentation in the JIRA issue is a bit incorrect. The
 issue has been fixed in all versions including and after 1.0.0. Earlier
 releases (0.10, 0.9) will fail when the leader changes.
 However, you don't necessarily need to upgrade to Flink 1.0.0 to
 resolve the issue: With checkpointing enabled, your job will fail on a
 leader change, then Flink will restart the Kafka consumers and they'll find
 the new leaders.
 Starting from Flink 1.0.0 the Kafka consumer will handle leader changes
 without failing.

 Regards,
 Robert

 On Tue, Apr 19, 2016 at 12:17 PM, Balaji Rajagopalan <
 balaji.rajagopa...@olacabs.com> wrote:

> I am facing this exception repeatedly while trying to consume from
> kafka topic. It seems it was reported in 1.0.0 and fixed in 1.0.0, how can
>  I be sure that is fixed in the version of flink that I am using, does it
> require me to install patch updates ?
>
> Caused by: java.lang.RuntimeException: Unable to find a leader for
> partitions: [FetchPartition {topic=capi, partition=0, 
> offset=-915623761776}]
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.findLeaderForPartitions(LegacyFetcher.java:323)
> at
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:162)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
>
> https://issues.apache.org/jira/browse/FLINK-3368
>


>>>
>>
>


Custom time window in Flink

2016-04-20 Thread Piyush Shrivastava
Hello,I wanted to enquire how a job I am trying to do with Flink can be done.I 
have also posted a question on StackOverflow. PFB the 
link:http://stackoverflow.com/questions/36720192/custom-windows-charging-in-flink
I am using Flink's TimeWindow functionality to perform some computations. I am 
creating a 5 minute Window. However I want to create a one hour Window for only 
the first time. The next Windows I need are of 5 minutes. Such that for the 
first hour, data is collected and my operation is performed on it. Once this is 
done, every five minutes the same operation is performed. 
Can you kindly help me with this? How can such a functionality be implemented?

 Thanks and Regards,Piyush Shrivastava
http://webograffiti.com

Re: ClasNotFound when submitting job from command line

2016-04-20 Thread Flavio Pompermaier
At the moment my 2 PRs about jdbc batch connector and input format fix are
pending so I have my customized implementation of the inputformat in my jar
andas jdbc backend I need ijdbc6.jar as dependency (that I shade on the
jar).
Thus the class.forName() is called on the inputformat open().
On 20 Apr 2016 09:32, "Robert Metzger"  wrote:

Hi Flavio,

in which class are you calling Class.forName()? Is the class where the
Class.forName() call is loaded from the user jar or is it a class from the
Flink distribution?
I'm asking because Class.forName() is using the classloader of the class
where the call is located. So if the class has been loaded from the system
class loader, it can not access classes from the user jar.



On Tue, Apr 19, 2016 at 6:13 PM, Flavio Pompermaier 
wrote:

> I use maven to generate the shaded jar (and the classes are inside it) but
> when the job starts it can load those classes using Class.forName()
> (required to instantiate the JDBC connections).
> I think it's probably a problem related to class loading of Flink
>
> On Tue, Apr 19, 2016 at 6:02 PM, Balaji Rajagopalan <
> balaji.rajagopa...@olacabs.com> wrote:
>
>> In your pom.xml add the maven.plugins like this, and you will have to add
>> all the dependent artifacts, this works for me, if you fire mvn clean
>> compile package, the created jar is a fat jar.
>>
>> 
>>
>> org.apache.maven.plugins
>>
>> maven-dependency-plugin
>>
>> 2.9
>>
>> 
>>
>> 
>>
>> unpack
>>
>> 
>>
>> prepare-package
>>
>> 
>>
>> unpack
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> org.apache.flink
>>
>>
>> flink-connector-kafka-0.8_${scala.version}
>>
>> ${flink.version}
>>
>> jar
>>
>> false
>>
>>
>> ${project.build.directory}/classes
>>
>>
>> org/apache/flink/**
>>
>> 
>>
>>...
>>
>>   ...
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> 
>>
>> On Tue, Apr 19, 2016 at 9:10 PM, Flavio Pompermaier > > wrote:
>>
>>> Hi to all,
>>>
>>> I just tied to dubmit my application to the Flink cluster (1.0.1) but I
>>> get ClassNotFound exceptions for classes inside my shaded jar (like
>>> oracle.jdbc.OracleDriver or org.apache.commons.pool2.PooledObjectFactory).
>>> Those classes are in the shaded jar but aren't found.
>>> If I put the jars in the flink's lib dir (for every node of the cluster)
>>> things work.
>>> How can I solve that?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>


Re: Compression - AvroOutputFormat and over network ?

2016-04-20 Thread Tarandeep Singh
Avro changes look easy. I think I can make those changes.
To make changes to network data, I need some directions.

@Ufuk please point me to corresponding code.

thanks,
Tarandeep

On Mon, Apr 18, 2016 at 11:05 AM, Robert Metzger 
wrote:

> Hi Tarandeep,
>
> I think for that you would need to set a codec factory on the
> DataFileWriter. Sadly we don't expose that method to the user.
>
> If you want, you can contribute this change to Flink. Otherwise, I can
> quickly fix it.
>
> Regards,
> Robert
>
>
> On Mon, Apr 18, 2016 at 2:36 PM, Ufuk Celebi  wrote:
>
>> Hey Tarandeep,
>>
>> regarding the network part: not possible at the moment. It's pretty
>> straight forward to add support for it, but no one ever got around to
>> actually implementing it. If you would like to contribute, I am happy
>> to give some hints about which parts of the system would need to be
>> modified.
>>
>> – Ufuk
>>
>>
>> On Mon, Apr 18, 2016 at 12:56 PM, Tarandeep Singh 
>> wrote:
>> > Hi,
>> >
>> > How can I set compression for AvroOutputFormat when writing files on
>> HDFS?
>> > Also, can we set compression for intermediate data that is sent over
>> network
>> > (from map to reduce phase) ?
>> >
>> > Thanks,
>> > Tarandeep
>>
>
>


RE: Custom time window in Flink

2016-04-20 Thread Radu Tudoran
Hi,

Tha way to do this is to create your own evictor. In the evictor you can than 
decide when the events are removed. I would suggest creating a symmetric 
trigger as well because I would assume that you also need to fire the 
computation first after 1 hour and then at each 5 minutes.
The logic would be that you have a field that marks whether a window was 
created or not (e.g., a Boolean field in the evictor class), which once a 
window is created you can set it to false and from there one you operate on 5 
minutes windows


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Piyush Shrivastava [mailto:piyush...@yahoo.co.in]
Sent: Wednesday, April 20, 2016 9:59 AM
To: user@flink.apache.org
Subject: Custom time window in Flink

Hello,
I wanted to enquire how a job I am trying to do with Flink can be done.
I have also posted a question on StackOverflow. PFB the link:
http://stackoverflow.com/questions/36720192/custom-windows-charging-in-flink

I am using Flink's TimeWindow functionality to perform some computations. I am 
creating a 5 minute Window. However I want to create a one hour Window for only 
the first time. The next Windows I need are of 5 minutes.
Such that for the first hour, data is collected and my operation is performed 
on it. Once this is done, every five minutes the same operation is performed.

Can you kindly help me with this? How can such a functionality be implemented?



Thanks and Regards,
Piyush Shrivastava
[WeboGraffiti]
http://webograffiti.com


Re: Custom time window in Flink

2016-04-20 Thread Piyush Shrivastava
Hello,Thanks a lot for your reply. Can you share a sample code or example which 
I can refer while creating a custom evictor and trigger? Thanks and 
Regards,Piyush Shrivastava
http://webograffiti.com
 

On Wednesday, 20 April 2016 2:50 PM, Radu Tudoran  
wrote:
 

 #yiv9940457293 #yiv9940457293 -- _filtered #yiv9940457293 
{font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv9940457293 
{font-family:SimSun;panose-1:2 1 6 0 3 1 1 1 1 1;} _filtered #yiv9940457293 
{panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv9940457293 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv9940457293 
{font-family:Tahoma;panose-1:2 11 6 4 3 5 4 4 2 4;} _filtered #yiv9940457293 
{panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv9940457293 #yiv9940457293 
p.yiv9940457293MsoNormal, #yiv9940457293 li.yiv9940457293MsoNormal, 
#yiv9940457293 div.yiv9940457293MsoNormal 
{margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv9940457293 a:link, 
#yiv9940457293 span.yiv9940457293MsoHyperlink 
{color:blue;text-decoration:underline;}#yiv9940457293 a:visited, #yiv9940457293 
span.yiv9940457293MsoHyperlinkFollowed 
{color:purple;text-decoration:underline;}#yiv9940457293 code {}#yiv9940457293 
p.yiv9940457293MsoAcetate, #yiv9940457293 li.yiv9940457293MsoAcetate, 
#yiv9940457293 div.yiv9940457293MsoAcetate 
{margin:0in;margin-bottom:.0001pt;font-size:8.0pt;}#yiv9940457293 
span.yiv9940457293EmailStyle18 {color:#1F497D;}#yiv9940457293 
span.yiv9940457293BalloonTextChar {}#yiv9940457293 .yiv9940457293MsoChpDefault 
{font-size:10.0pt;} _filtered #yiv9940457293 {margin:1.0in 1.0in 1.0in 
1.0in;}#yiv9940457293 div.yiv9940457293WordSection1 {}#yiv9940457293 Hi,    Tha 
way to do this is to create your own evictor. In the evictor you can than 
decide when the events are removed. I would suggest creating a symmetric 
trigger as well because I would assume that you also need to fire the 
computation first after 1 hour and then at each 5 minutes. The logic would be 
that you have a field that marks whether a window was created or not (e.g., a 
Boolean field in the evictor class), which once a window is created you can set 
it to false and from there one you operate on 5 minutes windows       Dr. Radu 
Tudoran Research Engineer - Big Data Expert IT R&D Division     HUAWEI 
TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 
München    E-mail:radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: 
+49 891588344173    HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its 
attachments contain confidential information from HUAWEI, which is intended 
only for the person or entity whose address is listed above. Any use of the 
information contained herein in any way (including, but not limited to, total 
or partial disclosure, reproduction, or dissemination) by persons other than 
the intended recipient(s) is prohibited. If you receive this e-mail in error, 
please notify the sender by phone or email immediately and delete it!    From: 
Piyush Shrivastava [mailto:piyush...@yahoo.co.in]
Sent: Wednesday, April 20, 2016 9:59 AM
To: user@flink.apache.org
Subject: Custom time window in Flink    Hello, I wanted to enquire how a job I 
am trying to do with Flink can be done. I have also posted a question on 
StackOverflow. PFB the link: 
http://stackoverflow.com/questions/36720192/custom-windows-charging-in-flink    
I am using Flink'sTimeWindow functionality to perform some computations. I am 
creating a 5 minute Window. However I want to create a one hourWindow for only 
the first time. The next Windows I need are of 5 minutes.  Such that for the 
first hour, data is collected and my operation is performed on it. Once this is 
done, every five minutes the same operation is performed.     Can you kindly 
help me with this? How can such a functionality be implemented?

      Thanks and Regards, Piyush Shrivastava  http://webograffiti.com 

  

Re: Master (1.1-SNAPSHOT) Can't run on YARN

2016-04-20 Thread Stefano Baghino
Not exactly, I just wanted to let you know about it and know if someone
else experimented this issue; perhaps it's more of a dev mailing list
discussion, sorry for posting this here. Feel free to continue the
discussion on the other list if you feel it's more appropriate.

On Tue, Apr 19, 2016 at 6:53 PM, Ufuk Celebi  wrote:

> Hey Stefano,
>
> Flink's resource management has been refactored for 1.1 recently. This
> could be a regression introduced by this. Max can probably help you
> with more details. Is this currently a blocker for you?
>
> – Ufuk
>
> On Tue, Apr 19, 2016 at 6:31 PM, Stefano Baghino
>  wrote:
> > Hi everyone,
> >
> > I'm currently experiencing a weird situation, I hope you can help me out
> > with this.
> >
> > I've cloned and built from the master, then I've edited the default
> config
> > fil by adding my Hadoop config path, exported the HADOOP_CONF_DIR env var
> > and ran bin/yarn-session.sh -n 1 -s 2 -jm 2048 -tm 2048
> >
> > The first thing I noticed is that I had to put "-s 2" or the task
> managers
> > gets created with -1 slots (!) by default.
> >
> > After putting "-s 2" the YARN session startup hangs when trying to
> register
> > the task managers. I've stopped the session and aggregated the logs and
> read
> > a lot (several thousands) of the messages I attach at the bottom; any
> idea
> > of what this may be?
> >
> > Thank you a lot in advance!
> >
> > 2016-04-19 12:15:59,507 INFO  org.apache.flink.yarn.YarnTaskManager
> > - Trying to register at JobManager
> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 1,
> timeout:
> > 500 milliseconds)
> >
> > 2016-04-19 12:15:59,649 ERROR org.apache.flink.yarn.YarnTaskManager
> > - The registration at JobManager
> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
> > because: java.lang.IllegalStateException: Resource
> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
> > registered with resource manager.. Retrying later...
> >
> > 2016-04-19 12:16:00,025 INFO  org.apache.flink.yarn.YarnTaskManager
> > - Trying to register at JobManager
> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 2,
> timeout:
> > 1000 milliseconds)
> >
> > 2016-04-19 12:16:00,033 ERROR org.apache.flink.yarn.YarnTaskManager
> > - The registration at JobManager
> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
> > because: java.lang.IllegalStateException: Resource
> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
> > registered with resource manager.. Retrying later...
> >
> > 2016-04-19 12:16:01,045 INFO  org.apache.flink.yarn.YarnTaskManager
> > - Trying to register at JobManager
> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 3,
> timeout:
> > 2000 milliseconds)
> >
> > 2016-04-19 12:16:01,053 ERROR org.apache.flink.yarn.YarnTaskManager
> > - The registration at JobManager
> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
> > because: java.lang.IllegalStateException: Resource
> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
> > registered with resource manager.. Retrying later...
> >
> > 2016-04-19 12:16:03,064 INFO  org.apache.flink.yarn.YarnTaskManager
> > - Trying to register at JobManager
> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 4,
> timeout:
> > 4000 milliseconds)
> >
> > 2016-04-19 12:16:03,072 ERROR org.apache.flink.yarn.YarnTaskManager
> > - The registration at JobManager
> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
> > because: java.lang.IllegalStateException: Resource
> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
> > registered with resource manager.. Retrying later...
> >
> > 2016-04-19 12:16:07,085 INFO  org.apache.flink.yarn.YarnTaskManager
> > - Trying to register at JobManager
> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 5,
> timeout:
> > 8000 milliseconds)
> >
> > 2016-04-19 12:16:07,092 ERROR org.apache.flink.yarn.YarnTaskManager
> > - The registration at JobManager
> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
> > because: java.lang.IllegalStateException: Resource
> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
> > registered with resource manager.. Retrying later...
> >
> > 2016-04-19 12:16:09,664 INFO  org.apache.flink.yarn.YarnTaskManager
> > - Trying to register at JobManager
> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 1,
> timeout:
> > 500 milliseconds)
> >
> >
> > --
> > BR,
> > Stefano Baghino
> >
> > Software Engineer @ Radicalbit
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Checkpoint and restore states

2016-04-20 Thread Aljoscha Krettek
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang  wrote:

> Hi all,
>
> I am doing a simple word count example and want to checkpoint the
> accumulated word counts. I am not having any luck getting the counts saved
> and restored. Can someone help?
>
> env.enableCheckpointing(1000)
>
> env.setStateBackend(new MemoryStateBackend())
>
>
>>  ...
>
>
>
> inStream
>> .keyBy({s => s})
>>
>>
>>
>> *.mapWithState((in:String, count:Option[Int]) => {val newCount =
>> count.getOrElse(0) + 1((in, newCount), Some(newCount))})*
>> .print()
>
>
>
> Thanks,
>
> Jack Huang
>


RE: Custom time window in Flink

2016-04-20 Thread Radu Tudoran
Hi,

Easiest way is to just start from the code of an existing one

https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors

If you take the example of Timeevictor you would just need to use the same code 
and modify the public int evict class.

Same story with the triggers

https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers

Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Piyush Shrivastava [mailto:piyush...@yahoo.co.in]
Sent: Wednesday, April 20, 2016 11:24 AM
To: user@flink.apache.org
Subject: Re: Custom time window in Flink

Hello,
Thanks a lot for your reply. Can you share a sample code or example which I can 
refer while creating a custom evictor and trigger?

Thanks and Regards,
Piyush Shrivastava
[WeboGraffiti]
http://webograffiti.com

On Wednesday, 20 April 2016 2:50 PM, Radu Tudoran 
mailto:radu.tudo...@huawei.com>> wrote:

Hi,

Tha way to do this is to create your own evictor. In the evictor you can than 
decide when the events are removed. I would suggest creating a symmetric 
trigger as well because I would assume that you also need to fire the 
computation first after 1 hour and then at each 5 minutes.
The logic would be that you have a field that marks whether a window was 
created or not (e.g., a Boolean field in the evictor class), which once a 
window is created you can set it to false and from there one you operate on 5 
minutes windows


Dr. Radu Tudoran
Research Engineer - Big Data Expert
IT R&D Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Piyush Shrivastava [mailto:piyush...@yahoo.co.in]
Sent: Wednesday, April 20, 2016 9:59 AM
To: user@flink.apache.org
Subject: Custom time window in Flink

Hello,
I wanted to enquire how a job I am trying to do with Flink can be done.
I have also posted a question on StackOverflow. PFB the link:
http://stackoverflow.com/questions/36720192/custom-windows-charging-in-flink

I am using Flink's TimeWindow functionality to perform some computations. I am 
creating a 5 minute Window. However I want to create a one hour Window for only 
the first time. The next Windows I need are of 5 minutes.
Such that for the first hour, data is collected and my operation is performed 
on it. Once this is done, every five minutes the same operation is performed.

Can you kindly help me with this? How can such a functionality be implemented?


Thanks and Regards,
Piyush Shrivastava
[WeboGraffiti]
http://webograffiti.com



Re: Sink Parallelism

2016-04-20 Thread Fabian Hueske
Hi Ravinder,

your drawing is pretty much correct (Flink will inject a combiner between
flat map and reduce which locally combines records with the same key).
The partitioning between flat map and reduce is done with hash partitioning
by default. However, you can also define a custom partitioner to control
how records are distributed.

Best, Fabian

2016-04-19 17:04 GMT+02:00 Ravinder Kaur :

> Hello Chesnay,
>
> Thank you for the reply. According to this
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
> if I set -p = 2 then sink will also have 2 Sink subtaks and the final
> result will end up in 2 stream partitions or say 2 chunks and combining
> them will be the global result of the WordCount of input Dataset. And when
> I say I have 2 taskmanagers with one taskslot each these 2 chunks are saved
> on 2 machines in the end.
>
> I have attached an image of my understanding by working out an example
> WordCount with -p = 4. ​​Could you also explain how the communication among
> taskmanagers happen while redistributing streams and how tuples with same
> key end up in one taskmanager? Basically the implementation of groupBy on
> multiple taskmanagers.
>
> Thanks,
> Ravinder Kaur
>
> On Tue, Apr 19, 2016 at 4:01 PM, Chesnay Schepler 
> wrote:
>
>> The picture you reference does not really show how dataflows are
>> connected.
>> For a better picture, visit this link:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>
>> Let me know if this doesn't answer your question.
>>
>>
>> On 19.04.2016 14:22, Ravinder Kaur wrote:
>>
>>> Hello All,
>>>
>>> Considering the following streaming dataflow of the example WordCount, I
>>> want to understand how Sink is parallelised.
>>>
>>>
>>> Source --> flatMap --> groupBy(), sum() --> Sink
>>>
>>> If I set the paralellism at runtime using -p, as shown here
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>>>
>>> I want to understand how Sink is done parallelly and how the global
>>> result is distributed.
>>>
>>> As far as I understood groupBy(0) is applied to the tuples>> Integer> emitted from the flatMap funtion, which groupes by the String
>>> value and sum(1) aggregates the Integer value getting the count.
>>>
>>> That means streams will be redistributed so that tuples grouped by the
>>> same String value be sent to one taskmanager and the Sink step should be
>>> writing the results to the specified path. When Sink step is also
>>> parallelised then each taskmanager should emit a chunk. These chunks put
>>> together must be the global result.
>>>
>>> But when I see the pictorial representation it seems that each task slot
>>> will run a copy of the streaming dataflow and will be performing the
>>> operations on the chunk of data it gets and outputs the result. But if this
>>> is the case the global result would have duplicates of strings and would be
>>> wrong.
>>>
>>> Could one of you kindly clarify what exactly happens?
>>>
>>> Kind Regards,
>>> Ravinder Kaur
>>>
>>>
>>>
>>>
>>
>


Re: Flink + S3

2016-04-20 Thread Ufuk Celebi
On Wed, Apr 20, 2016 at 1:35 AM, Michael-Keith Bernard
 wrote:
> We're running on self-managed EC2 instances (and we'll eventually have a 
> mirror cluster in our colo). The provided documentation notes that for Hadoop 
> 2.6, we'd need such-and-such version of hadoop-aws and guice on the CP. If I 
> wanted to instead use Hadoop 2.7, which versions of those dependencies should 
> I get? And how can I look that up myself? The pom file for hadoop-aws[1] 
> doesn't mention a specific dependency on Guice, so I'm curious how the author 
> of that documentation knew exactly the dependencies and versions required.

Hey Michael-Keith,

I think you meant Guava and not Guice.

How to determine, which dependencies you need is quite a mess at the
moment. It depends on a combination of 3 things:
1) the dependencies of hadoop-aws [1],
2) which S3 file system you use (in case of the docs
org.apache.hadoop.fs.s3native.NativeS3FileSystem) [2],
3) what Flink shades away in its Hadoop dependencies [3]

1) hadoop-aws depends on hadoop-common (and other packages).
hadoop-common is already part of Flink (including the fs.FileSystem
classes etc.)
2) NativeS3FileSystem uses dependencies from hadoop-common like
FileSystem and from hadoop-aws like Jets3tNativeFileSystemStore. The
hadoop-common stuff is part of Flink and Jets3tNativeFileSystemStore
is part of hadoop-aws. The big issue here is that other S3 FS
implementations might work with the aws-java-sdk packages of
hadoop-aws.
3) Flink shades Hadoop's Guava dependency away and that's why you need
to add it manually to the CP.

So, if you go for the suggested NativeS3FileSystem, you end up needing
hadoop-aws and Guava. Of course, this might change in future versions
of Flink and/or Hadoop. I will update the docs for the different
versions of Flink and Hadoop for now and hope that this will help. :-(

The easiest solution in the future would be that Flink comes with
hadoop-aws, but I don't think that this is going to happen.

– Ufuk

[1] http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.6.0
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html#provide-s3-filesystem-dependency
[3] https://github.com/apache/flink/blob/master/flink-shaded-hadoop/pom.xml


Re: Checkpoint and restore states

2016-04-20 Thread Stefano Baghino
Hi Jack,

it seems you correctly enabled the checkpointing by calling
`env.enableCheckpointing`. However, your UDFs have to either implement the
Checkpointed interface or use the Key/Value State interface to make sure
the state of the computation is snapshotted.

The documentation explains how to define your functions so that they
checkpoint the state far better than I could in this post:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek 
wrote:

> Hi,
> what seems to be the problem?
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 03:52 Jack Huang  wrote:
>
>> Hi all,
>>
>> I am doing a simple word count example and want to checkpoint the
>> accumulated word counts. I am not having any luck getting the counts saved
>> and restored. Can someone help?
>>
>> env.enableCheckpointing(1000)
>>
>> env.setStateBackend(new MemoryStateBackend())
>>
>>
>>>  ...
>>
>>
>>
>> inStream
>>> .keyBy({s => s})
>>>
>>>
>>>
>>> *.mapWithState((in:String, count:Option[Int]) => {val newCount =
>>> count.getOrElse(0) + 1((in, newCount), Some(newCount))})*
>>> .print()
>>
>>
>>
>> Thanks,
>>
>> Jack Huang
>>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Trying to detecting changes

2016-04-20 Thread toletum
Hi! I'm a beginner in Flink.
I'm reading from a Kafka topic. In this topic, I receive a character each 
event, like that:

Event.: 1 2 3 4 5 6 7 8 9...
Data..: A A A B B B B C C...

I would like to do a "trigger" when the character is different than before. For 
example:
Event º1 fire because of A is different to "null"
Event º4 fire because of B is different to A
Event º8 fire because of C is different to B

Could it be possible?


Operation of Windows and Triggers

2016-04-20 Thread Piyush Shrivastava
I wanted to know how Windows and Triggers work in Flink. I am creating a time 
window of 20 seconds and a count trigger of 100.
stream.keyBy(0)
 .timeWindow(Time.seconds(20))
 .trigger(CountTrigger.of(100))
In this case, when will my window get triggered? When 20 seconds has passed, 
100 messages are passed? 

Thanks and Regards,Piyush Shrivastava
http://webograffiti.com


Re: Operation of Windows and Triggers

2016-04-20 Thread Fabian Hueske
Hi Piyush,

if you explicitly set a trigger, the default trigger of the window is
replaced.
In your example, the time trigger is replaced by the count trigger, i.e.,
the window is only evaluated after the 100th element was received.

This blog post discusses windows and triggers [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-04-20 13:20 GMT+02:00 Piyush Shrivastava :

> I wanted to know how Windows and Triggers work in Flink. I am creating a
> time window of 20 seconds and a count trigger of 100.
>
> stream.keyBy(0)
>  .timeWindow(Time.seconds(20))
>  .trigger(CountTrigger.of(100))
>
> In this case, when will my window get triggered? When 20 seconds has
> passed, 100 messages are passed?
>
> Thanks and Regards,
> Piyush Shrivastava 
> [image: WeboGraffiti]
> http://webograffiti.com
>


Re: Checkpoint and restore states

2016-04-20 Thread Aljoscha Krettek
Hi,
the *withState() family of functions use the Key/Value state interface
internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino 
wrote:

> Hi Jack,
>
> it seems you correctly enabled the checkpointing by calling
> `env.enableCheckpointing`. However, your UDFs have to either implement the
> Checkpointed interface or use the Key/Value State interface to make sure
> the state of the computation is snapshotted.
>
> The documentation explains how to define your functions so that they
> checkpoint the state far better than I could in this post:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>
> I hope I've been of some help, I'll gladly help you further if you need it.
>
> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> what seems to be the problem?
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 03:52 Jack Huang 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am doing a simple word count example and want to checkpoint the
>>> accumulated word counts. I am not having any luck getting the counts saved
>>> and restored. Can someone help?
>>>
>>> env.enableCheckpointing(1000)
>>>
>>> env.setStateBackend(new MemoryStateBackend())
>>>
>>>
  ...
>>>
>>>
>>>
>>> inStream
 .keyBy({s => s})



 *.mapWithState((in:String, count:Option[Int]) => {val newCount
 = count.getOrElse(0) + 1((in, newCount), Some(newCount))})*
 .print()
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jack Huang
>>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Need to sleep the thread to let my Flink Zookeeper datasource with NodeDataChanged work

2016-04-20 Thread Sendoh
Hi,

I'd like to implement a custom Zookeeper data source which reads zookeeper
if NodeDataChange. Now it's not working perfectly because the thread needs
to sleep otherwise it doesn't work.

public static class ZKSource implements SourceFunction {
private static final long serialVersionUID = 1L;
private static String zkData;
private static CuratorFramework client;
private static boolean isChanged = false;

@Override
public void run(SourceContext ctx) throws Exception {
// first collect 
ctx.collect(getZKData());   

while (true) {
Thread.sleep(1); // without this doesn't work 
if(isChanged){
ctx.collect(getZKData());
isChanged = false;
 }  
}
}

 @Override
public void cancel() {
 client.close();
}

private String getZKData() throws Exception {
RetryPolicy retryPolicy = new 
ExponentialBackoffRetry(1000, 3);

client = 
CuratorFrameworkFactory.newClient(zookeeperConnectionString,
retryPolicy);

client.getCuratorListenable().addListener(new 
ZKListener());

client.start();
zkData = new 
String(client.getData().watched().forPath(node),
StandardCharsets.UTF_8);

return zkData;
}

static class ZKListener implements CuratorListener {

@Override
public void eventReceived(CuratorFramework 
curatorFramework, CuratorEvent
curatorEvent) throwsException {
EventType changeEvent = 
curatorEvent.getWatchedEvent().getType();   
try {
switch (changeEvent){
case NodeDataChanged:
isChanged = true;   

break;
default:
break;
}

}

catch (Exception e){
LOG.warn("Exception", e);
client.close();
}
}

}
}

I was thinking ctx.collect needs some time to complete and without
Thread.sleep(), ctx.collect() doesn't start and isChanged becomes false
quicker than ctx.collect(). Besides, the time needed to sleep may vary based
on how busy the Flink cluster is which sounds not very robust? 
Would be glad to know any better implementation and mistakes I have made.

Best,

Sendoh



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Need-to-sleep-the-thread-to-let-my-Flink-Zookeeper-datasource-with-NodeDataChanged-work-tp6249.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Trying to detecting changes

2016-04-20 Thread Aljoscha Krettek
Hi,
this could be done by implementing a user function that keeps state or by
using windows with a custom Trigger. On only works, however, if you only
have one Kafka partition and if your Flink job is executing with
parallelism=1. Otherwise we don't have any ordering guarantees on streams.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 12:50  wrote:

> Hi! I'm a beginner in Flink.
> I'm reading from a Kafka topic. In this topic, I receive a character each
> event, like that:
>
> Event.: 1 2 3 4 5 6 7 8 9...
> Data..: A A A B B B B C C...
>
> I would like to do a "trigger" when the character is different than
> before. For example:
> Event º1 fire because of A is different to "null"
> Event º4 fire because of B is different to A
> Event º8 fire because of C is different to B
>
> Could it be possible?
>


Re: Operation of Windows and Triggers

2016-04-20 Thread Piyush Shrivastava
Hi Fabian,
Thanks for the information. I also quickly want to ask that if I implement a 
custom trigger that fires in one hour for the first time and then every five 
minutes, what all functions do I need to use?I am considering creating my own 
trigger referring the code 
here:https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
What changes do I need to make? Is it even possible to do this?
 Thanks and Regards,Piyush Shrivastava
http://webograffiti.com
 

On Wednesday, 20 April 2016 4:59 PM, Fabian Hueske  
wrote:
 

 Hi Piyush,

if you explicitly set a trigger, the default trigger of the window is replaced. 
In your example, the time trigger is replaced by the count trigger, i.e., the 
window is only evaluated after the 100th element was received.

This blog post discusses windows and triggers [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-04-20 13:20 GMT+02:00 Piyush Shrivastava :

I wanted to know how Windows and Triggers work in Flink. I am creating a time 
window of 20 seconds and a count trigger of 100.
stream.keyBy(0)
 .timeWindow(Time.seconds(20))
 .trigger(CountTrigger.of(100))
In this case, when will my window get triggered? When 20 seconds has passed, 
100 messages are passed? 

Thanks and Regards,Piyush Shrivastava
http://webograffiti.com




  

Re: Checkpoint and restore states

2016-04-20 Thread Stefano Baghino
My bad, thanks for pointing that out.

On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek 
wrote:

> Hi,
> the *withState() family of functions use the Key/Value state interface
> internally, so that should work.
>
> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Hi Jack,
>>
>> it seems you correctly enabled the checkpointing by calling
>> `env.enableCheckpointing`. However, your UDFs have to either implement the
>> Checkpointed interface or use the Key/Value State interface to make sure
>> the state of the computation is snapshotted.
>>
>> The documentation explains how to define your functions so that they
>> checkpoint the state far better than I could in this post:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>
>> I hope I've been of some help, I'll gladly help you further if you need
>> it.
>>
>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> what seems to be the problem?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 03:52 Jack Huang 
>>> wrote:
>>>
 Hi all,

 I am doing a simple word count example and want to checkpoint the
 accumulated word counts. I am not having any luck getting the counts saved
 and restored. Can someone help?

 env.enableCheckpointing(1000)

 env.setStateBackend(new MemoryStateBackend())


>  ...



 inStream
> .keyBy({s => s})
>
>
>
> *.mapWithState((in:String, count:Option[Int]) => {val newCount
> = count.getOrElse(0) + 1((in, newCount), Some(newCount))})*
> .print()



 Thanks,

 Jack Huang

>>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Trying to detecting changes

2016-04-20 Thread Stefano Baghino
Can the CEP library be used for this use case?

On Wed, Apr 20, 2016 at 2:02 PM, Aljoscha Krettek 
wrote:

> Hi,
> this could be done by implementing a user function that keeps state or by
> using windows with a custom Trigger. On only works, however, if you only
> have one Kafka partition and if your Flink job is executing with
> parallelism=1. Otherwise we don't have any ordering guarantees on streams.
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 12:50  wrote:
>
>> Hi! I'm a beginner in Flink.
>> I'm reading from a Kafka topic. In this topic, I receive a character each
>> event, like that:
>>
>> Event.: 1 2 3 4 5 6 7 8 9...
>> Data..: A A A B B B B C C...
>>
>> I would like to do a "trigger" when the character is different than
>> before. For example:
>> Event º1 fire because of A is different to "null"
>> Event º4 fire because of B is different to A
>> Event º8 fire because of C is different to B
>>
>> Could it be possible?
>>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Sink Parallelism

2016-04-20 Thread Ravinder Kaur
Hi Fabian,

Thank you for the explanation. Could you also explain how keyBy() would
work? I assume it should work same as groupBy(), but in streaming mode
since the data is unbounded all elements that arrive in the first window
are grouped/partitioned by keys and aggregated and so on until no more
streams left. The global result then has the aggregated key/value pairs.

Kind Regards,
Ravinder Kaur



On Wed, Apr 20, 2016 at 12:12 PM, Fabian Hueske  wrote:

> Hi Ravinder,
>
> your drawing is pretty much correct (Flink will inject a combiner between
> flat map and reduce which locally combines records with the same key).
> The partitioning between flat map and reduce is done with hash
> partitioning by default. However, you can also define a custom partitioner
> to control how records are distributed.
>
> Best, Fabian
>
> 2016-04-19 17:04 GMT+02:00 Ravinder Kaur :
>
>> Hello Chesnay,
>>
>> Thank you for the reply. According to this
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>> if I set -p = 2 then sink will also have 2 Sink subtaks and the final
>> result will end up in 2 stream partitions or say 2 chunks and combining
>> them will be the global result of the WordCount of input Dataset. And when
>> I say I have 2 taskmanagers with one taskslot each these 2 chunks are saved
>> on 2 machines in the end.
>>
>> I have attached an image of my understanding by working out an example
>> WordCount with -p = 4. ​​Could you also explain how the communication among
>> taskmanagers happen while redistributing streams and how tuples with same
>> key end up in one taskmanager? Basically the implementation of groupBy on
>> multiple taskmanagers.
>>
>> Thanks,
>> Ravinder Kaur
>>
>> On Tue, Apr 19, 2016 at 4:01 PM, Chesnay Schepler 
>> wrote:
>>
>>> The picture you reference does not really show how dataflows are
>>> connected.
>>> For a better picture, visit this link:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>
>>> Let me know if this doesn't answer your question.
>>>
>>>
>>> On 19.04.2016 14:22, Ravinder Kaur wrote:
>>>
 Hello All,

 Considering the following streaming dataflow of the example WordCount,
 I want to understand how Sink is parallelised.


 Source --> flatMap --> groupBy(), sum() --> Sink

 If I set the paralellism at runtime using -p, as shown here
 https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots

 I want to understand how Sink is done parallelly and how the global
 result is distributed.

 As far as I understood groupBy(0) is applied to the tuples>>> Integer> emitted from the flatMap funtion, which groupes by the String
 value and sum(1) aggregates the Integer value getting the count.

 That means streams will be redistributed so that tuples grouped by the
 same String value be sent to one taskmanager and the Sink step should be
 writing the results to the specified path. When Sink step is also
 parallelised then each taskmanager should emit a chunk. These chunks put
 together must be the global result.

 But when I see the pictorial representation it seems that each task
 slot will run a copy of the streaming dataflow and will be performing the
 operations on the chunk of data it gets and outputs the result. But if this
 is the case the global result would have duplicates of strings and would be
 wrong.

 Could one of you kindly clarify what exactly happens?

 Kind Regards,
 Ravinder Kaur




>>>
>>
>


Re: Flink + S3

2016-04-20 Thread Robert Metzger
Hi Michael-Keith,

Welcome to the Flink community!

Let me try answer your question regarding the "best" deployment options:
>From what I see from the mailing list, most of our users are using one of
the big hadoop distributions (including Amazon EMR) with YARN installed.
Having YARN makes things quite comfortable because its taking care of
restarting JVMs in failure cases, deployment of the flink jars, security
(if used), ...

Flink might look tightly coupled to Hadoop, because we ship it with
different Hadoop versions, but that's mostly for convenience reasons. You
don't need to install anything from the Hadoop project to run Flink.
Its just that in the past, almost all users were using Hadoop, so it was
not an issue.

I would not install YARN on the cluster just for running Flink.

How are you running Kafka in your cluster?
I think you can run Flink in a very similar way. The only difficult part is
probably the failure recovery: When a Flink JVM crashes, you want it to be
restarted by some service on your server.
I've seen users which were using OS tools like upstart to ensure the Flink
TaskManagers are always running.

Regarding the puppet module: The Apache Bigtop project (basically an open
source hadoop distro) is currently adding support for Flink:
https://issues.apache.org/jira/browse/BIGTOP-1927. They'll create deb and
rpm packages, puppet scripts and testing for Flink.

Maybe they can use your puppet code as a reference.

Independent of their effort, I think it would be great if you publish your
puppet module.

Regarding Mesos: There are plans to integrate Flink with Mesos, I don't
think it'll make it into 1.1 but 1.2 seems realistic.


Regards,
Robert



On Wed, Apr 20, 2016 at 1:35 AM, Michael-Keith Bernard <
mkbern...@opentable.com> wrote:

> Hey Till & Ufuk,
>
> We're running on self-managed EC2 instances (and we'll eventually have a
> mirror cluster in our colo). The provided documentation notes that for
> Hadoop 2.6, we'd need such-and-such version of hadoop-aws and guice on the
> CP. If I wanted to instead use Hadoop 2.7, which versions of those
> dependencies should I get? And how can I look that up myself? The pom file
> for hadoop-aws[1] doesn't mention a specific dependency on Guice, so I'm
> curious how the author of that documentation knew exactly the dependencies
> and versions required.
>
> Let me switch my questioning slightly:
>
> What is the best (most widely supported, most common, easiest to use,
> easiest to scale, etc) way to deploy Flink today? I've been operating under
> the assumption that, since we have no existing Hadoop infrastructure, the
> path of least resistance is a stand-alone cluster. However it seems like
> Flink is still relatively tightly coupled to the Hadoop platform, so maybe
> I would be better off switching to Hadoop + YARN? Our requirements are
> simple (for now):
>
> Kafka (consumer & producer), S3 (read & write), streaming- and batch-mode
> computation
>
> If the answer turns out to be that YARN is the best path forward for us,
> do you have any recommendations on how to get started building a minimal,
> but production ready Hadoop cluster suitable for Flink? Ambari looks
> amazing, so barring feedback to the contrary I'll probably be investing
> time looking at that first.
>
> Finally, any relevant book recommendations? :) I'm extremely excited about
> this project, so all the feedback I can get is highly welcome and highly
> appreciated!
>
> Cheers,
> Michael-Keith
>
> P.S. Is there planned support for Mesos as an alternative scheduler to
> YARN?
>
> [1]:
> http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.2/hadoop-aws-2.7.2.pom
>
> 
> From: Ufuk Celebi 
> Sent: Tuesday, April 19, 2016 2:30 AM
> To: user@flink.apache.org
> Subject: Re: Flink + S3
>
> Hey Michael-Keith,
>
> are you running self-managed EC2 instances or EMR?
>
> In addition to what Till said:
>
> We tried to document this here as well:
>
> https://ci.apache.org/projects/flink/flink-docs-master/setup/aws.html#provide-s3-filesystem-dependency
>
> Does this help? You don't need to really install Hadoop, but only
> provide the configuration and the S3 FileSystem code on your
> classpath.
>
> If you use EMR + Flink on YARN, it should work out of the box.
>
> – Ufuk
>
> On Tue, Apr 19, 2016 at 10:23 AM, Till Rohrmann 
> wrote:
> > Hi Michael-Keith,
> >
> > you can use S3 as the checkpoint directory for the filesystem state
> backend.
> > This means that whenever a checkpoint is performed the state data will be
> > written to this directory.
> >
> > The same holds true for the zookeeper recovery storage directory. This
> > directory will contain the submitted and not yet finished jobs as well as
> > some meta data for the checkpoints. With this information it is possible
> to
> > restore running jobs if the job manager dies.
> >
> > As far as I know, Flink relies on Hadoop's file system wrapper classes to
> > support S3. Flink has built 

Access Flink UI for jobs submitted using Eclipse

2016-04-20 Thread Ritesh Kumar Singh
Hi,

Just a basic question, I am using flink via eclipse but when I execute my
jobs, I can't access it via the web dashboard. It's basically a maven
project and so I've added all the flink jars in my pom file and I'm
executing my code by getting the execution environment as follows:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
***code***
env.execute();

Although the job runs successfully, I can't access the info via the web
dashboard. What part am I doing wrong?
I also tried installing flink and starting the start-local.sh script. This
starts the web dashboard but my flink jobs using eclipse are not getting
registered to this web interface. Is there any workaround?

Thanks,
Ritesh


Threads waiting on LocalBufferPool

2016-04-20 Thread Maciek Próchniak

Hi,
I'm running my flink job on one rather large machine (20 cores with 
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.

It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state 
processing.

Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state 
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple filtering 
job, so I wanted to see what's the bottleneck.


It turns out that quite often all of kafka threads are stuck waiting for 
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x7f77fd80d000 
nid=0x8118 in Object.wait() [0x7f7ad54d9000]

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)

- locked <0x0002eade3890> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x0002eb73cbd0> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)

at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at 
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)

- locked <0x0002eaf3eb50> (a java.lang.Object)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)

- locked <0x0002eaf3eb50> (a java.lang.Object)

This seems a bit weird for me, as most of state processing threads are idle:

"My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5 
os_prio=0 tid=0x7f7a7400e000 nid=0x80a7 waiting on condition 
[0x7f7bee8ed000]

   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0002eb840c38> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)

Re: Threads waiting on LocalBufferPool

2016-04-20 Thread Ufuk Celebi
Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try this?

– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak  wrote:
> Hi,
> I'm running my flink job on one rather large machine (20 cores with
> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
> It does more or less:
> read csv from kafka -> keyBy one of the fields -> some custom state
> processing.
> Kafka topic has 24 partitions, so my parallelism is also 24
>
> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
> backend) I reached a point when throughput is ~120-150k/s.
> One the same kafka and machine I reached > 500k/s with simple filtering job,
> so I wanted to see what's the bottleneck.
>
> It turns out that quite often all of kafka threads are stuck waiting for
> buffer from pool:
> "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x7f77fd80d000
> nid=0x8118 in Object.wait() [0x7f7ad54d9000]
>java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> - locked <0x0002eade3890> (a java.util.ArrayDeque)
> at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
> - locked <0x0002eb73cbd0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
> at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
> - locked <0x0002eaf3eb50> (a java.lang.Object)
> at
> org.apache.flink.streaming.connectors.kafka.Fli

Re: Master (1.1-SNAPSHOT) Can't run on YARN

2016-04-20 Thread Ufuk Celebi
The user list is OK since you are reporting a bug here ;-) I'm
confident that this will be fixed soon! :-)

On Wed, Apr 20, 2016 at 11:28 AM, Stefano Baghino
 wrote:
> Not exactly, I just wanted to let you know about it and know if someone else
> experimented this issue; perhaps it's more of a dev mailing list discussion,
> sorry for posting this here. Feel free to continue the discussion on the
> other list if you feel it's more appropriate.
>
> On Tue, Apr 19, 2016 at 6:53 PM, Ufuk Celebi  wrote:
>>
>> Hey Stefano,
>>
>> Flink's resource management has been refactored for 1.1 recently. This
>> could be a regression introduced by this. Max can probably help you
>> with more details. Is this currently a blocker for you?
>>
>> – Ufuk
>>
>> On Tue, Apr 19, 2016 at 6:31 PM, Stefano Baghino
>>  wrote:
>> > Hi everyone,
>> >
>> > I'm currently experiencing a weird situation, I hope you can help me out
>> > with this.
>> >
>> > I've cloned and built from the master, then I've edited the default
>> > config
>> > fil by adding my Hadoop config path, exported the HADOOP_CONF_DIR env
>> > var
>> > and ran bin/yarn-session.sh -n 1 -s 2 -jm 2048 -tm 2048
>> >
>> > The first thing I noticed is that I had to put "-s 2" or the task
>> > managers
>> > gets created with -1 slots (!) by default.
>> >
>> > After putting "-s 2" the YARN session startup hangs when trying to
>> > register
>> > the task managers. I've stopped the session and aggregated the logs and
>> > read
>> > a lot (several thousands) of the messages I attach at the bottom; any
>> > idea
>> > of what this may be?
>> >
>> > Thank you a lot in advance!
>> >
>> > 2016-04-19 12:15:59,507 INFO  org.apache.flink.yarn.YarnTaskManager
>> > - Trying to register at JobManager
>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 1,
>> > timeout:
>> > 500 milliseconds)
>> >
>> > 2016-04-19 12:15:59,649 ERROR org.apache.flink.yarn.YarnTaskManager
>> > - The registration at JobManager
>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>> > because: java.lang.IllegalStateException: Resource
>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>> > registered with resource manager.. Retrying later...
>> >
>> > 2016-04-19 12:16:00,025 INFO  org.apache.flink.yarn.YarnTaskManager
>> > - Trying to register at JobManager
>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 2,
>> > timeout:
>> > 1000 milliseconds)
>> >
>> > 2016-04-19 12:16:00,033 ERROR org.apache.flink.yarn.YarnTaskManager
>> > - The registration at JobManager
>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>> > because: java.lang.IllegalStateException: Resource
>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>> > registered with resource manager.. Retrying later...
>> >
>> > 2016-04-19 12:16:01,045 INFO  org.apache.flink.yarn.YarnTaskManager
>> > - Trying to register at JobManager
>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 3,
>> > timeout:
>> > 2000 milliseconds)
>> >
>> > 2016-04-19 12:16:01,053 ERROR org.apache.flink.yarn.YarnTaskManager
>> > - The registration at JobManager
>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>> > because: java.lang.IllegalStateException: Resource
>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>> > registered with resource manager.. Retrying later...
>> >
>> > 2016-04-19 12:16:03,064 INFO  org.apache.flink.yarn.YarnTaskManager
>> > - Trying to register at JobManager
>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 4,
>> > timeout:
>> > 4000 milliseconds)
>> >
>> > 2016-04-19 12:16:03,072 ERROR org.apache.flink.yarn.YarnTaskManager
>> > - The registration at JobManager
>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>> > because: java.lang.IllegalStateException: Resource
>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>> > registered with resource manager.. Retrying later...
>> >
>> > 2016-04-19 12:16:07,085 INFO  org.apache.flink.yarn.YarnTaskManager
>> > - Trying to register at JobManager
>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 5,
>> > timeout:
>> > 8000 milliseconds)
>> >
>> > 2016-04-19 12:16:07,092 ERROR org.apache.flink.yarn.YarnTaskManager
>> > - The registration at JobManager
>> > Some(akka.tcp://flink@172.31.20.101:57379/user/jobmanager) was refused,
>> > because: java.lang.IllegalStateException: Resource
>> > ResourceID{resourceId='container_e02_1461077293721_0016_01_02'} not
>> > registered with resource manager.. Retrying later...
>> >
>> > 2016-04-19 12:16:09,664 INFO  org.apache.flink.yarn.YarnTaskManager
>> > - Trying to register at JobManager
>> > akka.tcp://flink@172.31.20.101:57379/user/jobmanager (attempt 1,
>> > timeout:
>> > 500 milliseconds)
>> >
>> >
>> > --
>> > BR,
>> > Stefano Baghino
>> >
>> > Software Engineer @ Radicalbit
>
>
>
>
> 

Re: Trying to detecting changes

2016-04-20 Thread Till Rohrmann
You could use CEP for that. First you would create a pattern of two states
which matches everything. In the select function you could then check
whether both elements are different.

However, this would be a little bit of an overkill for this simple use
case. You could for example simply use a flat map operation which stores
the last seen element. Then whenever you see a different element you can
emit a change event.

Cheers,
Till

On Wed, Apr 20, 2016 at 2:43 PM, Stefano Baghino <
stefano.bagh...@radicalbit.io> wrote:

> Can the CEP library be used for this use case?
>
> On Wed, Apr 20, 2016 at 2:02 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> this could be done by implementing a user function that keeps state or by
>> using windows with a custom Trigger. On only works, however, if you only
>> have one Kafka partition and if your Flink job is executing with
>> parallelism=1. Otherwise we don't have any ordering guarantees on streams.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 20 Apr 2016 at 12:50  wrote:
>>
>>> Hi! I'm a beginner in Flink.
>>> I'm reading from a Kafka topic. In this topic, I receive a character
>>> each event, like that:
>>>
>>> Event.: 1 2 3 4 5 6 7 8 9...
>>> Data..: A A A B B B B C C...
>>>
>>> I would like to do a "trigger" when the character is different than
>>> before. For example:
>>> Event º1 fire because of A is different to "null"
>>> Event º4 fire because of B is different to A
>>> Event º8 fire because of C is different to B
>>>
>>> Could it be possible?
>>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Re: Access Flink UI for jobs submitted using Eclipse

2016-04-20 Thread Till Rohrmann
Have you created a RemoteExecutionEnvironment to submit your job from
within the IDE to the running cluster? See here [1] for more information.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html

Cheers,
Till
​

On Wed, Apr 20, 2016 at 3:41 PM, Ritesh Kumar Singh <
riteshoneinamill...@gmail.com> wrote:

> Hi,
>
> Just a basic question, I am using flink via eclipse but when I execute my
> jobs, I can't access it via the web dashboard. It's basically a maven
> project and so I've added all the flink jars in my pom file and I'm
> executing my code by getting the execution environment as follows:
>
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(8);
> ***code***
> env.execute();
>
> Although the job runs successfully, I can't access the info via the web
> dashboard. What part am I doing wrong?
> I also tried installing flink and starting the start-local.sh script. This
> starts the web dashboard but my flink jobs using eclipse are not getting
> registered to this web interface. Is there any workaround?
>
> Thanks,
> Ritesh
>


Re: Trying to detecting changes

2016-04-20 Thread Stefano Baghino
Ok, thanks for the clarification Till.

On Wed, Apr 20, 2016 at 4:46 PM, Till Rohrmann  wrote:

> You could use CEP for that. First you would create a pattern of two states
> which matches everything. In the select function you could then check
> whether both elements are different.
>
> However, this would be a little bit of an overkill for this simple use
> case. You could for example simply use a flat map operation which stores
> the last seen element. Then whenever you see a different element you can
> emit a change event.
>
> Cheers,
> Till
>
> On Wed, Apr 20, 2016 at 2:43 PM, Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> Can the CEP library be used for this use case?
>>
>> On Wed, Apr 20, 2016 at 2:02 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> this could be done by implementing a user function that keeps state or
>>> by using windows with a custom Trigger. On only works, however, if you only
>>> have one Kafka partition and if your Flink job is executing with
>>> parallelism=1. Otherwise we don't have any ordering guarantees on streams.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Wed, 20 Apr 2016 at 12:50  wrote:
>>>
 Hi! I'm a beginner in Flink.
 I'm reading from a Kafka topic. In this topic, I receive a character
 each event, like that:

 Event.: 1 2 3 4 5 6 7 8 9...
 Data..: A A A B B B B C C...

 I would like to do a "trigger" when the character is different than
 before. For example:
 Event º1 fire because of A is different to "null"
 Event º4 fire because of B is different to A
 Event º8 fire because of C is different to B

 Could it be possible?

>>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Sink Parallelism

2016-04-20 Thread Fabian Hueske
In batch / DataSet programs, groupBy() is execute by partitioning the data
(usually hash partitioning) and sorting each partition to group all
elements with the same key.
keyBy() in DataStream programs also partitions the data and results in a
KeyedStream. The KeyedStream has information about the partitioning which
is used for subsequent operations that require to hold state such as
windows or other operators that use partitioned state. So keyBy() by itself
if not grouping or aggregating data. It only partitions and preserves
information about the partitioning which is used by following operators.

Best, Fabian

2016-04-20 14:56 GMT+02:00 Ravinder Kaur :

> Hi Fabian,
>
> Thank you for the explanation. Could you also explain how keyBy() would
> work? I assume it should work same as groupBy(), but in streaming mode
> since the data is unbounded all elements that arrive in the first window
> are grouped/partitioned by keys and aggregated and so on until no more
> streams left. The global result then has the aggregated key/value pairs.
>
> Kind Regards,
> Ravinder Kaur
>
>
>
> On Wed, Apr 20, 2016 at 12:12 PM, Fabian Hueske  wrote:
>
>> Hi Ravinder,
>>
>> your drawing is pretty much correct (Flink will inject a combiner between
>> flat map and reduce which locally combines records with the same key).
>> The partitioning between flat map and reduce is done with hash
>> partitioning by default. However, you can also define a custom partitioner
>> to control how records are distributed.
>>
>> Best, Fabian
>>
>> 2016-04-19 17:04 GMT+02:00 Ravinder Kaur :
>>
>>> Hello Chesnay,
>>>
>>> Thank you for the reply. According to this
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>> if I set -p = 2 then sink will also have 2 Sink subtaks and the final
>>> result will end up in 2 stream partitions or say 2 chunks and combining
>>> them will be the global result of the WordCount of input Dataset. And when
>>> I say I have 2 taskmanagers with one taskslot each these 2 chunks are saved
>>> on 2 machines in the end.
>>>
>>> I have attached an image of my understanding by working out an example
>>> WordCount with -p = 4. ​​Could you also explain how the communication among
>>> taskmanagers happen while redistributing streams and how tuples with same
>>> key end up in one taskmanager? Basically the implementation of groupBy on
>>> multiple taskmanagers.
>>>
>>> Thanks,
>>> Ravinder Kaur
>>>
>>> On Tue, Apr 19, 2016 at 4:01 PM, Chesnay Schepler 
>>> wrote:
>>>
 The picture you reference does not really show how dataflows are
 connected.
 For a better picture, visit this link:
 https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows

 Let me know if this doesn't answer your question.


 On 19.04.2016 14:22, Ravinder Kaur wrote:

> Hello All,
>
> Considering the following streaming dataflow of the example WordCount,
> I want to understand how Sink is parallelised.
>
>
> Source --> flatMap --> groupBy(), sum() --> Sink
>
> If I set the paralellism at runtime using -p, as shown here
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-taskmanager-processing-slots
>
> I want to understand how Sink is done parallelly and how the global
> result is distributed.
>
> As far as I understood groupBy(0) is applied to the tuples Integer> emitted from the flatMap funtion, which groupes by the String
> value and sum(1) aggregates the Integer value getting the count.
>
> That means streams will be redistributed so that tuples grouped by the
> same String value be sent to one taskmanager and the Sink step should be
> writing the results to the specified path. When Sink step is also
> parallelised then each taskmanager should emit a chunk. These chunks put
> together must be the global result.
>
> But when I see the pictorial representation it seems that each task
> slot will run a copy of the streaming dataflow and will be performing the
> operations on the chunk of data it gets and outputs the result. But if 
> this
> is the case the global result would have duplicates of strings and would 
> be
> wrong.
>
> Could one of you kindly clarify what exactly happens?
>
> Kind Regards,
> Ravinder Kaur
>
>
>
>

>>>
>>
>


Re: Operation of Windows and Triggers

2016-04-20 Thread Fabian Hueske
Hi Piyush,

that's not trivial to implement. You can only do that with a so-called
GlobalWindow, i.e., a window which receives all elements of a partition,
and a custom trigger which has state to decide whether it has triggered the
first window or not. It won't work with a CountTrigger.

Best, Fabian

2016-04-20 14:20 GMT+02:00 Piyush Shrivastava :

> Hi Fabian,
>
> Thanks for the information. I also quickly want to ask that if I implement
> a custom trigger that fires in one hour for the first time and then every
> five minutes, what all functions do I need to use?
> I am considering creating my own trigger referring the code here:
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
>
> What changes do I need to make? Is it even possible to do this?
>
> Thanks and Regards,
> Piyush Shrivastava 
> [image: WeboGraffiti]
> http://webograffiti.com
>
>
> On Wednesday, 20 April 2016 4:59 PM, Fabian Hueske 
> wrote:
>
>
> Hi Piyush,
>
> if you explicitly set a trigger, the default trigger of the window is
> replaced.
> In your example, the time trigger is replaced by the count trigger, i.e.,
> the window is only evaluated after the 100th element was received.
>
> This blog post discusses windows and triggers [1].
>
> Best, Fabian
>
> [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
>
> 2016-04-20 13:20 GMT+02:00 Piyush Shrivastava :
>
> I wanted to know how Windows and Triggers work in Flink. I am creating a
> time window of 20 seconds and a count trigger of 100.
>
> stream.keyBy(0)
>  .timeWindow(Time.seconds(20))
>  .trigger(CountTrigger.of(100))
>
> In this case, when will my window get triggered? When 20 seconds has
> passed, 100 messages are passed?
>
> Thanks and Regards,
> Piyush Shrivastava 
> [image: WeboGraffiti]
> http://webograffiti.com
>
>
>
>
>


Re: Operation of Windows and Triggers

2016-04-20 Thread Aljoscha Krettek
Just to clarify, the state of a Trigger on GlobalWindows is still local to
the key of the element that is in the window(s).

On Wed, 20 Apr 2016 at 18:11 Fabian Hueske  wrote:

> Hi Piyush,
>
> that's not trivial to implement. You can only do that with a so-called
> GlobalWindow, i.e., a window which receives all elements of a partition,
> and a custom trigger which has state to decide whether it has triggered the
> first window or not. It won't work with a CountTrigger.
>
> Best, Fabian
>
> 2016-04-20 14:20 GMT+02:00 Piyush Shrivastava :
>
>> Hi Fabian,
>>
>> Thanks for the information. I also quickly want to ask that if I
>> implement a custom trigger that fires in one hour for the first time and
>> then every five minutes, what all functions do I need to use?
>> I am considering creating my own trigger referring the code here:
>>
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
>>
>> What changes do I need to make? Is it even possible to do this?
>>
>> Thanks and Regards,
>> Piyush Shrivastava 
>> [image: WeboGraffiti]
>> http://webograffiti.com
>>
>>
>> On Wednesday, 20 April 2016 4:59 PM, Fabian Hueske 
>> wrote:
>>
>>
>> Hi Piyush,
>>
>> if you explicitly set a trigger, the default trigger of the window is
>> replaced.
>> In your example, the time trigger is replaced by the count trigger, i.e.,
>> the window is only evaluated after the 100th element was received.
>>
>> This blog post discusses windows and triggers [1].
>>
>> Best, Fabian
>>
>> [1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
>>
>> 2016-04-20 13:20 GMT+02:00 Piyush Shrivastava :
>>
>> I wanted to know how Windows and Triggers work in Flink. I am creating a
>> time window of 20 seconds and a count trigger of 100.
>>
>> stream.keyBy(0)
>>  .timeWindow(Time.seconds(20))
>>  .trigger(CountTrigger.of(100))
>>
>> In this case, when will my window get triggered? When 20 seconds has
>> passed, 100 messages are passed?
>>
>> Thanks and Regards,
>> Piyush Shrivastava 
>> [image: WeboGraffiti]
>> http://webograffiti.com
>>
>>
>>
>>
>>
>


Re: Leader not found

2016-04-20 Thread Balaji Rajagopalan
/usr/share/kafka_2.11-0.8.2.1/bin$ ./kafka-topics.sh --describe --topic
capi --zookeeper (someserver)

Topic:capi PartitionCount:1 ReplicationFactor:1 Configs:

Topic: capi Partition: 0 Leader: 0 Replicas: 0 Isr: 0


There are no events to consume from this topic, this I confirm by running
the console consumer.

./kafka-console-consumer.sh --topic topicname --zookeeper (some server)


The flink connector is other consumer. This is happening in our
pre-production machines consistently, I will also try to reproduce this
locally.

java.lang.RuntimeException: Unable to find a leader for partitions:
[FetchPartition {topic=location, partition=58, offset=-915623761776},
FetchPartition {topic=location, partition=60, offset=-915623761776},
FetchPartition {topic=location, partition=54, offset=-915623761776},
FetchPartition {topic=location, partition=56, offset=-915623761776},
FetchPartition {topic=location, partition=66, offset=-915623761776},
FetchPartition {topic=location, partition=68, offset=-915623761776},
FetchPartition {topic=location, partition=62, offset=-915623761776},
FetchPartition {topic=location, partition=64, offset=-915623761776},
FetchPartition {topic=location, partition=74, offset=-915623761776},
FetchPartition {topic=location, partition=76, offset=-915623761776},
FetchPartition {topic=location, partition=70, offset=-915623761776},
FetchPartition {topic=location, partition=72, offset=-915623761776},
FetchPartition {topic=location, partition=82, offset=-915623761776},
FetchPartition {topic=location, partition=84, offset=-915623761776},
FetchPartition {topic=location, partition=78, offset=-915623761776},
FetchPartition {topic=location, partition=80, offset=-915623761776},
FetchPartition {topic=location, partition=26, offset=-915623761776},
FetchPartition {topic=location, partition=28, offset=-915623761776},
FetchPartition {topic=location, partition=22, offset=-915623761776},
FetchPartition {topic=location, partition=24, offset=-915623761776},
FetchPartition {topic=location, partition=34, offset=-915623761776},
FetchPartition {topic=location, partition=36, offset=-915623761776},
FetchPartition {topic=location, partition=30, offset=-915623761776},
FetchPartition {topic=location, partition=32, offset=-915623761776},
FetchPartition {topic=location, partition=42, offset=-915623761776},
FetchPartition {topic=location, partition=44, offset=-915623761776},
FetchPartition {topic=location, partition=38, offset=-915623761776},
FetchPartition {topic=location, partition=40, offset=-915623761776},
FetchPartition {topic=location, partition=50, offset=-915623761776},
FetchPartition {topic=location, partition=52, offset=-915623761776},
FetchPartition {topic=location, partition=46, offset=-915623761776},
FetchPartition {topic=location, partition=48, offset=-915623761776},
FetchPartition {topic=location, partition=122, offset=-915623761776},
FetchPartition {topic=location, partition=124, offset=-915623761776},
FetchPartition {topic=location, partition=118, offset=-915623761776},
FetchPartition {topic=location, partition=120, offset=-915623761776},
FetchPartition {topic=location, partition=2, offset=-915623761776},
FetchPartition {topic=location, partition=130, offset=-915623761776},
FetchPartition {topic=location, partition=4, offset=-915623761776},
FetchPartition {topic=location, partition=132, offset=-915623761776},
FetchPartition {topic=location, partition=126, offset=-915623761776},
FetchPartition {topic=location, partition=0, offset=-915623761776},
FetchPartition {topic=location, partition=128, offset=-915623761776},
FetchPartition {topic=location, partition=10, offset=-915623761776},
FetchPartition {topic=location, partition=138, offset=-915623761776},
FetchPartition {topic=location, partition=12, offset=-915623761776},
FetchPartition {topic=location, partition=140, offset=-915623761776},
FetchPartition {topic=location, partition=6, offset=-915623761776},
FetchPartition {topic=location, partition=134, offset=-915623761776},
FetchPartition {topic=location, partition=8, offset=-915623761776},
FetchPartition {topic=location, partition=136, offset=-915623761776},
FetchPartition {topic=location, partition=18, offset=-915623761776},
FetchPartition {topic=location, partition=146, offset=-915623761776},
FetchPartition {topic=location, partition=20, offset=-915623761776},
FetchPartition {topic=location, partition=148, offset=-915623761776},
FetchPartition {topic=location, partition=14, offset=-915623761776},
FetchPartition {topic=location, partition=142, offset=-915623761776},
FetchPartition {topic=location, partition=16, offset=-915623761776},
FetchPartition {topic=location, partition=144, offset=-915623761776},
FetchPartition {topic=location, partition=90, offset=-915623761776},
FetchPartition {topic=location, partition=92, offset=-915623761776},
FetchPartition {topic=location, partition=86, offset=-915623761776},
FetchPartition {topic=location, partition=88, offset=-915623761776},
FetchPartition {topic=location, partit

Re: Leader not found

2016-04-20 Thread Balaji Rajagopalan
Robert,
Sorry I gave the information about wrong topic. Here is the right one.

balajirajagopalan@megatron-server02:/usr/share/kafka_2.11-0.8.2.1/bin$
./kafka-topics.sh --describe --topic location --zookeeper  (someserver)

Topic:location PartitionCount:150 ReplicationFactor:1 Configs:

Topic: location Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 1 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 2 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 3 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 4 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 5 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 6 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 7 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 8 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 9 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 10 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 11 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 12 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 13 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 14 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 15 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 16 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 17 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 18 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 19 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 20 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 21 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 22 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 23 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 24 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 25 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 26 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 27 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 28 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 29 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 30 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 31 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 32 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 33 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 34 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 35 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 36 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 37 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 38 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 39 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 40 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 41 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 42 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 43 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 44 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 45 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 46 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 47 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 48 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 49 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 50 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 51 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 52 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 53 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 54 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 55 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 56 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 57 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 58 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 59 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 60 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 61 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 62 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 63 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 64 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 65 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 66 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 67 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 68 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 69 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 70 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 71 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 72 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 73 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 74 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 75 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 76 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 77 Leader: 0 Replicas: 0 Isr: 0

Topic: location Partition: 78 Leader: 0 R

Re: How to perform this join operation?

2016-04-20 Thread Till Rohrmann
Hi Elias,

sorry for the late reply. You're right that with the windowed join you
would have to deal with pairs where the timestamp of (x,y) is not
necessarily earlier than the timestamp of z. Moreover, by using sliding
windows you would receive duplicates as you've described. Using tumbling
windows would mean that you lose join matches if (x,y) lives in an earlier
window. Thus, in order to solve your problem you would have to write a
custom stream operator.

The stream operator would do the following: Collecting the inputs from
(x,y) and z which are already keyed. Thus, we know that x=z holds true.
Using a priority queue we order the elements because we don't know how the
arrive at the operator. Whenever we receive a watermark indicating that no
earlier events can arrive anymore, we can go through the two priority
queues to join the elements. The queues are part of the operators state so
that we don't lose information in case of a recovery.

I've sketched such an operator here [1]. I hope this helps you to get
started.

[1] https://github.com/tillrohrmann/custom-join

Cheers,
Till

On Thu, Apr 14, 2016 at 5:12 PM, Elias Levy 
wrote:

> Anyone from Data Artisans have some idea of how to go about this?
>
> On Wed, Apr 13, 2016 at 5:32 PM, Maxim  wrote:
>
>> You could simulate the Samza approach by having a RichFlatMapFunction
>> over cogrouped streams that maintains the sliding window in its ListState.
>> As I understand the drawback is that the list state is not maintained in
>> the managed memory.
>> I'm interested to hear about the right way to implement this.
>>
>> On Wed, Apr 13, 2016 at 3:53 PM, Elias Levy 
>> wrote:
>>
>>> I am wondering how you would implement the following function in Flink.
>>> The function takes as input two streams.  One stream can be viewed a a
>>> tuple with two value *(x, y)*, the second stream is a stream of
>>> individual values *z*.  The function keeps a time based window on the
>>> first input (say, 24 hours).  Whenever it receives an element from the
>>> second stream, it compares the value *z* against the *x* element of
>>> each tuple in the window, and for each match it emits *(x, y)*.  You
>>> are basically doing a join on *x=z*.  Note that values from the second
>>> stream are not windowed and they are only matched to values from the first
>>> stream with an earlier timestamps.
>>>
>>> This was relatively easy to implement in Samza.  Consume off two topics,
>>> the first keyed by *x* and the second by *z*.  Consume both topics in a
>>> job.  Messages with the same key would be consumed by the same task.  The
>>> task could maintain a window of messages from the first stream in its local
>>> state,  Whenever a message came in via the second stream, it could look up
>>> in the local state for matching messages, and if it found any, send them to
>>> the output stream.  Obviously, with Samza you don't have the luxury of the
>>> system handling event time for you, but this work well and it is easy to
>>> implement.
>>>
>>> I am not clear how this would be implemented in Flink.
>>>
>>> It is easy enough to partition by key either stream, and to window the
>>> first stream using a sliding window, but from there out things get
>>> complicated.
>>>
>>> You can join the two streams by key, but you must do so using the same
>>> window for both streams.  That means events from the first stream may be
>>> matched to older events of the second stream, which is not what we want.  I
>>> suppose if both included a timestamp, you could later add a filter to
>>> remove such events from the merged stream.  But you would also have to deal
>>> with duplicates, as the window is a sliding window and the same two
>>> elements may match across all window panes that contain the matching
>>> elements.  So you need to dedup as well.
>>>
>>> coGroup seems like it would suffer from the same issues.
>>>
>>> Maybe the answer is connected streams, but there is scant documentation
>>> on the semantics of ConnectedStreams.  There isn't even an example that I
>>> could find that makes use of them.
>>>
>>> Thoughts?
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Checkpoint and restore states

2016-04-20 Thread Jack Huang
@Aljoscha:
For this word count example I am using a kafka topic as the input stream.
The problem is that when I cancel the task and restart it, the task loses
the accumulated word counts so far and start counting from 1 again. Am I
missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck
either. Canceling and restarting the task did not restore the states. Here
is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>   .keyBy({s => s})
>   .map(new StatefulCounter)


class StatefulCounter extends RichMapFunction[String, (String,Int)] with
> Checkpointed[Integer] {
>   private var count: Integer = 0
>
>   def map(in: String): (String,Int) = {
> count += 1
> return (in, count)
>   }
>   def snapshotState(l: Long, l1: Long): Integer = {
> count
>   }
>   def restoreState(state: Integer) {
> count = state
>   }
> }



Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
stefano.bagh...@radicalbit.io> wrote:

> My bad, thanks for pointing that out.
>
> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> the *withState() family of functions use the Key/Value state interface
>> internally, so that should work.
>>
>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>> stefano.bagh...@radicalbit.io> wrote:
>>
>>> Hi Jack,
>>>
>>> it seems you correctly enabled the checkpointing by calling
>>> `env.enableCheckpointing`. However, your UDFs have to either implement the
>>> Checkpointed interface or use the Key/Value State interface to make sure
>>> the state of the computation is snapshotted.
>>>
>>> The documentation explains how to define your functions so that they
>>> checkpoint the state far better than I could in this post:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html
>>>
>>> I hope I've been of some help, I'll gladly help you further if you need
>>> it.
>>>
>>> On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek 
>>> wrote:
>>>
 Hi,
 what seems to be the problem?

 Cheers,
 Aljoscha

 On Wed, 20 Apr 2016 at 03:52 Jack Huang 
 wrote:

> Hi all,
>
> I am doing a simple word count example and want to checkpoint the
> accumulated word counts. I am not having any luck getting the counts saved
> and restored. Can someone help?
>
> env.enableCheckpointing(1000)
>
> env.setStateBackend(new MemoryStateBackend())
>
>
>>  ...
>
>
>
> inStream
>> .keyBy({s => s})
>>
>>
>>
>> *.mapWithState((in:String, count:Option[Int]) => {val
>> newCount = count.getOrElse(0) + 1((in, newCount), Some(newCount))
>>   })*
>> .print()
>
>
>
> Thanks,
>
> Jack Huang
>

>>>
>>>
>>> --
>>> BR,
>>> Stefano Baghino
>>>
>>> Software Engineer @ Radicalbit
>>>
>>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


Re: Checkpoint and restore states

2016-04-20 Thread Stefano Baghino
Hello again,

thanks for giving a shot at my advice anyway but Aljoscha is far more
knowledgeable then me regarding Flink. :)

I hope I'm not getting mixed up again but I think gracefully canceling your
job means you lose your job state. Am I right in saying that the state is
preserved in case of abnormal termination (e.g.: the JobManager crashes) or
if you explicitly create a savepoint?

On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang 
wrote:

> @Aljoscha:
> For this word count example I am using a kafka topic as the input stream.
> The problem is that when I cancel the task and restart it, the task loses
> the accumulated word counts so far and start counting from 1 again. Am I
> missing something basic here?
>
> @Stefano:
> I also tried to implements the Checkpointed interface but had no luck
> either. Canceling and restarting the task did not restore the states. Here
> is my class:
>
> inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
>>   .keyBy({s => s})
>>   .map(new StatefulCounter)
>
>
> class StatefulCounter extends RichMapFunction[String, (String,Int)] with
>> Checkpointed[Integer] {
>>   private var count: Integer = 0
>>
>>   def map(in: String): (String,Int) = {
>> count += 1
>> return (in, count)
>>   }
>>   def snapshotState(l: Long, l1: Long): Integer = {
>> count
>>   }
>>   def restoreState(state: Integer) {
>> count = state
>>   }
>> }
>
>
>
> Thanks,
>
>
> Jack Huang
>
> On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <
> stefano.bagh...@radicalbit.io> wrote:
>
>> My bad, thanks for pointing that out.
>>
>> On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>> the *withState() family of functions use the Key/Value state interface
>>> internally, so that should work.
>>>
>>> On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <
>>> stefano.bagh...@radicalbit.io> wrote:
>>>
 Hi Jack,

 it seems you correctly enabled the checkpointing by calling
 `env.enableCheckpointing`. However, your UDFs have to either implement the
 Checkpointed interface or use the Key/Value State interface to make sure
 the state of the computation is snapshotted.

 The documentation explains how to define your functions so that they
 checkpoint the state far better than I could in this post:
 https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

 I hope I've been of some help, I'll gladly help you further if you need
 it.

 On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek >>> > wrote:

> Hi,
> what seems to be the problem?
>
> Cheers,
> Aljoscha
>
> On Wed, 20 Apr 2016 at 03:52 Jack Huang 
> wrote:
>
>> Hi all,
>>
>> I am doing a simple word count example and want to checkpoint the
>> accumulated word counts. I am not having any luck getting the counts 
>> saved
>> and restored. Can someone help?
>>
>> env.enableCheckpointing(1000)
>>
>> env.setStateBackend(new MemoryStateBackend())
>>
>>
>>>  ...
>>
>>
>>
>> inStream
>>> .keyBy({s => s})
>>>
>>>
>>>
>>> *.mapWithState((in:String, count:Option[Int]) => {val
>>> newCount = count.getOrElse(0) + 1((in, newCount), 
>>> Some(newCount))
>>>   })*
>>> .print()
>>
>>
>>
>> Thanks,
>>
>> Jack Huang
>>
>


 --
 BR,
 Stefano Baghino

 Software Engineer @ Radicalbit

>>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Count windows missing last elements?

2016-04-20 Thread Kostya Kulagin
I have a pretty big but final stream and I need to be able to window it by
number of elements.
In this case from my observations flink can 'skip' the latest chunk of data
if it has lower amount of elements than window size:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.addSource(new SourceFunction() {

  @Override
  public void run(SourceContext ctx) throws Exception {
LongStream.range(0, 35).forEach(ctx::collect);
  }

  @Override
  public void cancel() {

  }
});

source.countWindowAll(10).apply(new AllWindowFunction() {
  @Override
  public void apply(GlobalWindow window, Iterable values,
Collector out) throws Exception {
System.out.println(Joiner.on(',').join(values));
  }
}).print();

env.execute("yoyoyo");


Output:
0,1,2,3,4,5,6,7,8,9
10,11,12,13,14,15,16,17,18,19
20,21,22,23,24,25,26,27,28,29

I.e. elements from 10 to 35 are not being processed.

Does it make sense to have: count OR timeout window which will evict new
window when number of elements reach a threshold OR collecting timeout
occurs?


Re: asm IllegalArgumentException with 1.0.0

2016-04-20 Thread David Kim
Hi Stephan!

Following up on this issue, it seems the issue doesn't show itself when
using version 1.0.1. I'm able to run our unit tests in IntelliJ now.

Thanks!
David

On Wed, Apr 13, 2016 at 1:59 PM Stephan Ewen  wrote:

> Does this problem persist? (It might have been caused by maven caches with
> bad artifacts).
>
> The many transitive dependencies you see often come from the connectors -
> that is also why we do not put the connectors into the lib folder directly,
> so that these libraries are not always part of every Flink program.
>
>
> On Mon, Mar 14, 2016 at 9:16 PM, Zach Cox  wrote:
>
>> Yes pretty much - we use sbt to run the job in a local environment, not
>> Intellij, but should be the same thing. We were also seeing that exception
>> running unit tests locally. We did not see the exception when assembling a
>> fat jar and submitting to a remote Flink cluster.
>>
>> It seems like the flink-connector-elasticsearch jar should not have
>> shaded classes in it. Maybe that jar in maven central was built incorrectly?
>>
>> We worked around this by just not depending on that elasticsearch
>> connector at all, since we wrote our own connector for Elasticsearch 2.x.
>>
>> -Zach
>>
>>
>> On Mon, Mar 14, 2016 at 2:03 PM Andrew Whitaker <
>> andrew.whita...@braintreepayments.com> wrote:
>>
>>> We're having the same issue (we also have a dependency on
>>> flink-connector-elasticsearch). It's only happening to us in IntelliJ
>>> though. Is this the case for you as well?
>>>
>>> On Thu, Mar 10, 2016 at 3:20 PM, Zach Cox  wrote:
>>>
 After some poking around I noticed
 that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm
 classes. If I remove that dependency from my project then I do not get the
 IllegalArgumentException.


 On Thu, Mar 10, 2016 at 11:51 AM Zach Cox  wrote:

> Here are the jars on the classpath when I try to run our Flink job in
> a local environment (via `sbt run`):
>
>
> https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt
>
> There are many transitive dependencies pulled in from internal library
> projects that probably need to be cleaned out. Maybe we are including
> something that conflicts? Or maybe something important is being excluded?
>
> Are the asm classes included in Flink jars in some shaded form?
>
> Thanks,
> Zach
>
>
> On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen  wrote:
>
>> Dependency shading changed a bit between RC4 and RC5 - maybe a
>> different minor ASM version is now included in the "test" scope.
>>
>> Can you share the dependencies of the problematic project?
>>
>> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox  wrote:
>>
>>> I also noticed when I try to run this application in a local
>>> environment, I get the same IllegalArgumentException.
>>>
>>> When I assemble this application into a fat jar and run it on a
>>> Flink cluster using the CLI tools, it seems to run fine.
>>>
>>> Maybe my local classpath is missing something that is provided on
>>> the Flink task managers?
>>>
>>> -Zach
>>>
>>>
>>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox  wrote:
>>>
 Hi - after upgrading to 1.0.0, I'm getting this exception now in a
 unit test:

IllegalArgumentException:   (null:-1)
 org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
 Source)
 org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
 Source)

 org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)

 org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)

 org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)

 org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)

 org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)

 The line that causes that exception is just adding
 a FlinkKafkaConsumer08 source.

 ClassVisitor [1] seems to throw that IllegalArgumentException when
 it is not given a valid api version number, but InnerClosureFinder [2]
 looks fine to me.

 Any idea what might be causing this? This unit test worked fine
 with 1.0.0-rc0 jars.

 Thanks,
 Zach

 [1]
 http://websvn.ow2.org/filedetails.php?repname=asm&path=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
 [2]
 https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279



>>
>>>
>>>
>>> --
>>> A

Values are missing, probably due parallelism?

2016-04-20 Thread Kostya Kulagin
I think it has smth to do with parallelism and I probably do not have clear
understanding how parallelism works in flink but in this example:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource source = env.addSource(new SourceFunction() {

  @Override
  public void run(SourceContext ctx) throws Exception {
LongStream.range(0, 29).forEach(ctx::collect);
  }

  @Override
  public void cancel() {

  }
});

source.countWindowAll(10).apply(new AllWindowFunction() {
  @Override
  public void apply(GlobalWindow window, Iterable values,
Collector out) throws Exception {
for (Long value : values) {
  if (value % 3 == 0) {
out.collect(value);
  }
}
  }
}).print();

env.execute("yoyoyo");

Why my output is like this:

4> 9
1> 0
1> 12
3> 6
3> 18
2> 3
2> 15

? I.e. where id s value of 24 for example? I expect to see it. What am I
doing wrong?


Set up Flink Cluster on Windows machines

2016-04-20 Thread Yifei Li
Hi,

Is it possible to set up cluster on Windows machines? I have git installed
and when I use start-cluster.sh in git cmd, it asks me to type in password
for other machines. After I did it, it showed nohup:command not found.


I am just wondering if it is possible to set up clusters on Windows
machines? If yes, can anyone point me to any guide?

Thanks,

Yifei


Re: Set up Flink Cluster on Windows machines

2016-04-20 Thread Fabian Hueske
Hi Yifei,

I think this has not been done before. At least I am not aware of anybody
running Flink in cluster mode on Windows.
In principle this should work. It is possible to start a local instance on
Windows (start-local.bat) and to locally execute Flink programs on this
instance using the flink.bat script.
However, there are no scripts or further tooling support to start Flink
processes on remote Windows servers as for Linux using ssh.

I think the only way to do this right now is to manually start the Flink
Java processes on each Windows machine. You should check the bash scripts
for how to read certain configuration parameters from flink-conf.yaml and
how to construct the right classpath.

Best, Fabian

2016-04-21 0:39 GMT+02:00 Yifei Li :

> Hi,
>
> Is it possible to set up cluster on Windows machines? I have git installed
> and when I use start-cluster.sh in git cmd, it asks me to type in password
> for other machines. After I did it, it showed nohup:command not found.
>
>
> I am just wondering if it is possible to set up clusters on Windows
> machines? If yes, can anyone point me to any guide?
>
> Thanks,
>
> Yifei
>


Control triggering on empty window

2016-04-20 Thread Maxim
I have the following use case:

Input stream of timestamped "on" and "off" events received out of order.
I need to produce an event with time that system was "on" every 15 minutes.
Events should be produced only for intervals that system was "on".

When 15 minute window has at least one record it is triggered and the
required aggregate is created, but when no event is received within 15
minute period window is not triggered and nothing is produced.

I understand that it is not feasible to trigger on empty windows when the
set of keys is unbounded. But it would be nice to give the control for such
triggering to a window function. In my case the window function could
enable the empty triggering for the current key when the last event in the
evaluated window is "on" and disable it if is "off".
The strawman API for such feature:

public void apply(String key, TimeWindow window, Iterable
input, Collector out) throws Exception {

...

RuntimeContext context = this.getRuntimeContext();

if (lastEvent.isOn()) {

   context.enableEmptyWindowTriggering();

} else {

   context.disableEmptyWindowTriggering();

}

}

I could implement the same logic using global window and custom trigger and
evictor, but it looks like ugly workaround to me.

Is there any better way to solve this use case?

Thanks,

Maxim.


Join DataStream with dimension tables?

2016-04-20 Thread Srikanth
Hello,

I have a fairly typical streaming use case but not able to figure how to
implement it best in Flink.
I want to join records read from a kafka stream with one(or more) dimension
tables which are saved as flat files.

As per this jira  its not
possible to join DataStream with DataSet.
These tables are too big to do a collect() and join.

It will be good to read these files during startup, do a partitionByHash
and keep it cached.
On the DataStream may be do a keyBy and join.
Is something like this possible?

Srikanth


Re: Set up Flink Cluster on Windows machines

2016-04-20 Thread Jamie Grier
I should also think that using Cygwin may work quite well and allow you to
just use the current Unix-oriented shell scripts with maybe slight
modifications.

-Jamie


On Wed, Apr 20, 2016 at 4:11 PM, Fabian Hueske  wrote:

> Hi Yifei,
>
> I think this has not been done before. At least I am not aware of anybody
> running Flink in cluster mode on Windows.
> In principle this should work. It is possible to start a local instance on
> Windows (start-local.bat) and to locally execute Flink programs on this
> instance using the flink.bat script.
> However, there are no scripts or further tooling support to start Flink
> processes on remote Windows servers as for Linux using ssh.
>
> I think the only way to do this right now is to manually start the Flink
> Java processes on each Windows machine. You should check the bash scripts
> for how to read certain configuration parameters from flink-conf.yaml and
> how to construct the right classpath.
>
> Best, Fabian
>
> 2016-04-21 0:39 GMT+02:00 Yifei Li :
>
>> Hi,
>>
>> Is it possible to set up cluster on Windows machines? I have git
>> installed and when I use start-cluster.sh in git cmd, it asks me to type in
>> password for other machines. After I did it, it showed nohup:command not
>> found.
>>
>>
>> I am just wondering if it is possible to set up clusters on Windows
>> machines? If yes, can anyone point me to any guide?
>>
>> Thanks,
>>
>> Yifei
>>
>
>


-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier 
ja...@data-artisans.com


Re: Threads waiting on LocalBufferPool

2016-04-20 Thread Maciek Próchniak

Hi Ufuk,

thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to saturate the 
pool. After few minutes, periodically all kafka threads were waiting for 
bufferPool.
2) This seemed to help. I also reduced checkpoint interval - on rocks we 
had 5min, now I tried 30s. .


I attach throughput metrics - the former (around 18) is with increased 
heap & buffers, the latter (around 22) is with FileSystemStateBackend.
My state is few GB large - during the test it reached around 2-3GB. I 
must admit I was quite impressed that checkpointing to HDFS using 
FileSystem took only about 6-7s (with occasional spikes to 12-13s, which 
can be seen on metrcs - didn't check if it was caused by hdfs or sth else).


Now I looked at logs from 18 and seems like checkpointing rocksdb took 
around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196 [flink-akka.actor.default-dispatcher-147] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 6 (in 140588 ms)

- however I don't see any threads dumping state in threadStacks...

I guess I'll have to add some metrics around state invocations to see 
where is the problem with rocksDB... I'll write if I find anything, but 
that won't be today I think...


Btw - I was looking at FS state and I wonder would it be feasible to 
make variant of this state using immutable map (probably some scala one) 
to be able to do async checkpoints.
Then synchronous part would be essentially free - just taking the state 
map and materializing it asynchronously.
Of course, that would work only for immutable state - but this is often 
the case when writing in scala. WDYT?


thanks,
maciek




On 20/04/2016 16:28, Ufuk Celebi wrote:

Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try this?

– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak  wrote:

Hi,
I'm running my flink job on one rather large machine (20 cores with
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state
processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple filtering job,
so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting for
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x7f77fd80d000
nid=0x8118 in Object.wait() [0x7f7ad54d9000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
 - locked <0x0002eade3890> (a java.util.ArrayDeque)
 at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
 at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
 - locked <0x0002eb73cbd0> (a
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
 at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
 at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
 at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
 at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
 at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMa