why same Sliding ProcessTime TimeWindow triggered twice

2018-09-16 Thread 远远
hi,everyone:
today, i test Sliding ProcessTime TimeWindow with print some merties. i
find a same sliding window be printed twice, as fllower:

now   ===> 2018-09-16 15:11:44
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
now   ===> 2018-09-16 15:11:45
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation

but when i do some sum operator,it will not, i want to know why?
thanks.

my test code is:

object SlidingProcessTimeWindowTest {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource((context: SourceContext[String]) => {while(true)
context.collect(new Random().nextInt(100) + ":FRI")})
.keyBy(s => s.endsWith("FRI"))
.timeWindow(Time.minutes(1), Time.seconds(5))
.apply((e, w, iter, coll: Collector[String]) => {
println("now   ===> " + convert(DateTime.now().getMillis))
println("start ===> " + convert(w.getStart))
println("end   ===> " + convert(w.getEnd))
println("max   ===> " + convert(w.maxTimestamp()))
println(w)
//var reduce: Long = 0
//for(e <- iter){
//reduce += e.substring(0, e.length - 4).toInt
//}
//println("reduce ==> " + reduce)
coll.collect("aggreation")
}).setParallelism(1).print().setParallelism(1)

env.execute()
}

def convert(time: Long): String = {
new DateTime(time).toString("-MM-dd HH:mm:ss")
}
}


Re: why same Sliding ProcessTime TimeWindow triggered twice

2018-09-16 Thread Xingcan Cui
Hi,

I’ve tested your code in my local environment and everything worked fine. It’s 
a little weird to see your output like that. I wonder if you could give more 
information about your environment, e.g., your flink version and execution 
settings.

Thanks,
Xingcan

> On Sep 16, 2018, at 3:19 PM, 远远  wrote:
> 
> hi,everyone:
> today, i test Sliding ProcessTime TimeWindow with print some merties. i find 
> a same sliding window be printed twice, as fllower:
> 
> now   ===> 2018-09-16 15:11:44
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
> now   ===> 2018-09-16 15:11:45
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
>  
> but when i do some sum operator,it will not, i want to know why? 
> thanks. 
> 
> my test code is:
> object SlidingProcessTimeWindowTest {
> 
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.addSource((context: SourceContext[String]) => {while(true) 
> context.collect(new Random().nextInt(100) + ":FRI")})
> .keyBy(s => s.endsWith("FRI"))
> .timeWindow(Time.minutes(1), Time.seconds(5))
> .apply((e, w, iter, coll: Collector[String]) => {
> println("now   ===> " + convert(DateTime.now().getMillis))
> println("start ===> " + convert(w.getStart))
> println("end   ===> " + convert(w.getEnd))
> println("max   ===> " + convert(w.maxTimestamp()))
> println(w)
> //var reduce: Long = 0
> //for(e <- iter){
> //reduce += e.substring(0, e.length - 4).toInt
> //}
> //println("reduce ==> " + reduce)
> coll.collect("aggreation")
> }).setParallelism(1).print().setParallelism(1)
> 
> env.execute()
> }
> 
> def convert(time: Long): String = {
> new DateTime(time).toString("-MM-dd HH:mm:ss")
> }
> }



Fwd: why same Sliding ProcessTime TimeWindow triggered twice

2018-09-16 Thread 远远
-- Forwarded message -
From: 远远 
Date: 2018年9月16日周日 下午4:08
Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice
To: 


hi, the flink version that i test  is 1.4.2, and i just run test code with
local env in IDEA, and all the setting in the test code.
my os is deepin(linux based debian) 15.7...

and i try again, the print as flow:
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=153708511, end=153708517}
aggreation
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=153708511, end=153708517}
aggreation
now   ===> 2018-09-16 16:06:16
start ===> 2018-09-16 16:05:15
end   ===> 2018-09-16 16:06:15
max   ===> 2018-09-16 16:06:14
TimeWindow{start=1537085115000, end=1537085175000}
aggreation
now   ===> 2018-09-16 16:06:19
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=153708512, end=153708518}
aggreation
now   ===> 2018-09-16 16:06:20
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=153708512, end=153708518}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:25
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=153708513, end=153708519}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=153708513, end=153708519}
aggreation
now   ===> 2018-09-16 16:06:30
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=153708513, end=153708519}
aggreation
now   ===> 2018-09-16 16:06:36
start ===> 2018-09-16 16:05:35
end   ===> 2018-09-16 16:06:35
max   ===> 2018-09-16 16:06:34
TimeWindow{start=1537085135000, end=1537085195000}


Xingcan Cui  于2018年9月16日周日 下午3:55写道:

> Hi,
>
> I’ve tested your code in my local environment and everything worked fine.
> It’s a little weird to see your output like that. I wonder if you could
> give more information about your environment, e.g., your flink version and
> execution settings.
>
> Thanks,
> Xingcan
>
> On Sep 16, 2018, at 3:19 PM, 远远  wrote:
>
> hi,everyone:
> today, i test Sliding ProcessTime TimeWindow with print some merties. i
> find a same sliding window be printed twice, as fllower:
>
> now   ===> 2018-09-16 15:11:44
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
> now   ===> 2018-09-16 15:11:45
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
>
> but when i do some sum operator,it will not, i want to know why?
> thanks.
>
> my test code is:
>
> object SlidingProcessTimeWindowTest {
>
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.addSource((context: SourceContext[String]) => {while(true) 
> context.collect(new Random().nextInt(100) + ":FRI")})
> .keyBy(s => s.endsWith("FRI"))
> .timeWindow(Time.minutes(1), Time.seconds(5))
> .apply((e, w, iter, coll: Collector[String]) => {
> println("now   ===> " + convert(DateTime.now().getMillis))
> println("start ===> " + convert(w.getStart))
> println("end   ===> " + convert(w.getEnd))
> println("max   ===> " + convert(w.maxTimestamp()))
> println(w)
> //var reduce: Long = 0
> //for(e <- iter){
> //reduce += e.substring(0, e.length - 4).toInt
> //}
> //println("reduce ==> " + reduce)
> coll.collect("aggreation")
> }).setParallelism(1).print().setParallelism(1)
>
> env.execute()
> }
>
> def convert(time: Long): String = {
> new DateTime(time).toString("-MM-dd HH:mm:ss")
> }
> }
>
>
>


twitter source is shutting down after successfull connect

2018-09-16 Thread Björn Ebbinghaus
Hey,

I updated to flink-connector-twitter_2.11 version 1.6.0
and now I am getting this error on startup.
Used the 1.2.0 version before where everything worked.
Any idea what is causing this?

17:39:07,866 INFO  com.twitter.hbc.httpclient.BasicClient
 - New connection executed: flink-twitter-source, endpoint:
/1.1/statuses/sample.json
17:39:07,866 INFO
 org.apache.flink.streaming.connectors.twitter.TwitterSource   - Twitter
Streaming API connection established successfully
17:39:07,901 WARN  com.twitter.hbc.httpclient.ClientBase
  - flink-twitter-source Uncaught exception
java.lang.NullPointerException
at
org.apache.http.client.utils.URLEncodedUtils.parse(URLEncodedUtils.java:235)
at com.twitter.hbc.httpclient.auth.OAuth1.signRequest(OAuth1.java:71)
at
com.twitter.hbc.core.HttpConstants.constructRequest(HttpConstants.java:75)
at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
17:39:07,906 INFO  com.twitter.hbc.httpclient.ClientBase
  - flink-twitter-source exit event - null
17:39:07,906 INFO  com.twitter.hbc.httpclient.ClientBase
  - flink-twitter-source Shutting down httpclient connection manager

regards,

Björn


Re: Task managers run on separate nodes in a cluster

2018-09-16 Thread Martin Eden
Hi Till,

I was able to use mesos.constraints.hard.hostattribute to run all task
managers on a particular host in my cluster.

However, after looking a bit at the code, I'm not sure we can use
mesos.constraints.hard.hostattribute for load balancing Flink task managers
evenly across hosts in a Mesos cluster.

This is because under the hood it uses the fenzo host attribute value
constraint while we would need the fenzo balanced host attribute constraint.

The LaunchableMesosWorker sets the constraints via
the com.netflix.fenzo.TaskRequest and all of these hard constraints must be
satisfied by a host for the task scheduler to assign this task to that
host. Since the current implementation always return the static constraint
value configured i.e. what is after ":", see
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters#addHostAttrValueConstraint,
I don't see how we can use it to load balance unless the constraint value
would be dynamic based on the some property of the mesos task request.

Am I correct in my assumptions?

Any other way of load balancing?
Maybe by not even using the DCOS Flink package (mesos flink framework) at
all?
Any plans to add support for the fenzo balanced host attribute constraint?

Thanks,
M




On Fri, Sep 14, 2018 at 5:46 PM Till Rohrmann  wrote:

> Hi Martin,
>
> Flink supports the mesos.constraints.hard.hostattribute to specify task
> constraints based on agent attributes [1]. I think you could use them to
> control the task placement.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-constraints-hard-hostattribute
>
> Cheers,
> Till
>
> On Fri, Sep 14, 2018 at 3:08 PM Martin Eden 
> wrote:
>
>> Thanks Vino!
>>
>> On Fri, Sep 14, 2018 at 3:37 AM vino yang  wrote:
>>
>>> Hi Martin,
>>>
>>> Till has done most of the work of Flink on Mesos. Ping Till for you.
>>>
>>> Thanks, vino.
>>>
>>> Martin Eden  于2018年9月12日周三 下午11:21写道:
>>>
 Hi all,

 We're using Flink 1.3.2 with DCOS / Mesos.

 We have a 3 node cluster and are running the Flink DCOS package (Flink
 Mesos framework) configured with 3 Task Managers.

 Our goal is to run each of them on separate hosts for better load
 balancing but it seems the task managers end up running on the same host.

 Looked around the docs and DCOS Flink package but could not find any
 placement policy or anything of the sorts.

 Is there anything like that?

 We are also planning to upgrade to the latest Flink version. Is
 something like that supported in this newer version?

 Thanks,
 M

>>>


Re: Question regarding state cleaning using timer

2018-09-16 Thread Hequn Cheng
Hi bhaskar,

You need change nothing if you want to handle multi keys. Flink will do it
for you. The ValueState is a keyed state. You can think of Keyed State

as Operator State that has been partitioned, or sharded, with exactly one
state-partition per key.
TTL can be used in the same way.

Best, Hequn


On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com <
bhaskar.eba...@gmail.com> wrote:

> Hi
> In the following example given in flink:
> object ExampleCountWindowAverage extends App {
>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>   env.fromCollection(List(
> (1L, 3L),
> (1L, 5L),
> (1L, 7L),
> (1L, 4L),
> (1L, 2L)
>   )).keyBy(_._1)
> .flatMap(new CountWindowAverage())
> .print()
>   // the printed output will be (1,4) and (1,5)
>
>   env.execute("ExampleManagedState")
> }
>
> There is only 1 state because there is one key. In the CountWindowAverage
> method there is one state descriptor :  new ValueStateDescriptor[(Long,
> Long)]("average", createTypeInformation[(Long, Long)])
> Name given as "average". In order to implement this is generic way, shall
> i modify the  method:
>
> CountWindowAverage(keyName:String)  so that  new
> ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long,
> Long)]) is created. But how to configure TTL for this? Inside this method?
> In the eample:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
> ,   you have given a stand alone ValueStateDescriptor.  How can i use the
> TTL inside CountWindowAverage() per Key?
>
> Regards
> Bhaskar
>


Re: twitter source is shutting down after successfull connect

2018-09-16 Thread vino yang
Hi Björn,

>From the exception stack information, the NPE comes from the auth module of
the twitter client.
Did you fill in the relevant authentication information correctly?
Specific instructions can refer to the official guidance document.[1]

Thanks, vino.

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/twitter.html

Björn Ebbinghaus  于2018年9月16日周日 下午11:43写道:

> Hey,
>
> I updated to flink-connector-twitter_2.11 version 1.6.0
> and now I am getting this error on startup.
> Used the 1.2.0 version before where everything worked.
> Any idea what is causing this?
>
> 17:39:07,866 INFO  com.twitter.hbc.httpclient.BasicClient
>- New connection executed: flink-twitter-source, endpoint:
> /1.1/statuses/sample.json
> 17:39:07,866 INFO
>  org.apache.flink.streaming.connectors.twitter.TwitterSource   - Twitter
> Streaming API connection established successfully
> 17:39:07,901 WARN  com.twitter.hbc.httpclient.ClientBase
>   - flink-twitter-source Uncaught exception
> java.lang.NullPointerException
> at
> org.apache.http.client.utils.URLEncodedUtils.parse(URLEncodedUtils.java:235)
> at com.twitter.hbc.httpclient.auth.OAuth1.signRequest(OAuth1.java:71)
> at
> com.twitter.hbc.core.HttpConstants.constructRequest(HttpConstants.java:75)
> at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:133)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> 17:39:07,906 INFO  com.twitter.hbc.httpclient.ClientBase
>   - flink-twitter-source exit event - null
> 17:39:07,906 INFO  com.twitter.hbc.httpclient.ClientBase
>   - flink-twitter-source Shutting down httpclient connection manager
>
> regards,
>
> Björn
>


Re: LocalEnvironment and Python streaming

2018-09-16 Thread vino yang
Hi Joe,

Maybe Chesnay is better suited to answer this question, Ping him for you.

Thanks, vino.

Joe Malt  于2018年9月15日周六 上午1:51写道:

> Hi,
>
> Is there any way to execute a job using the LocalEnvironment when using
> the Python streaming API? This would make it much easier to debug jobs.
>
> At the moment I'm not aware of any way of running them except firing up a
> local cluster and submitting the job with pyflink-stream.sh.
>
> Thanks,
>
> Joe Malt
> Engineering Intern, Stream Processing
> Yelp
>


Utilising EMR's master node

2018-09-16 Thread Averell
Hello everyone,

I'm trying to run Flink on AWS EMR following the guides from  Flink doc

  
and from  AWS
 
, and it looks like the EMR master is never used, neither for JM nor TM.
"bin/yarn-session.sh -q" only shows the core nodes. We are only running
Flink on that EMR, so it is wasting of resources.

So, is there any way to use the master node for the job, at least for the JM
only?

If that is not possible, should I have different hardware configurations
between the master node and core nodes (smaller server for the master)? 

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-16 Thread Renjie Liu
Hi, Julio:
This happens frequently? What state backend do you use? The async
checkpoint duration and sync checkpoint duration seems normal compared to
others, it seems that most of the time are spent acking the checkpoint.

On Sun, Sep 16, 2018 at 9:24 AM vino yang  wrote:

> Hi Julio,
>
> Yes, it seems that fifty-five minutes is really long.
> However, it is linear with the time and size of the previous task adjacent
> to it in the diagram.
> I think your real application is concerned about why Flink accesses HDFS
> so slowly.
> You can call the DEBUG log to see if you can find any clues, or post the
> log to the mailing list to help others analyze the problem for you.
>
> Thanks, vino.
>
> Julio Biason  于2018年9月15日周六 上午7:03写道:
>
>> (Just an addendum: Although it's not a huge problem -- we can always
>> increase the checkpoint timeout time -- this anomalous situation makes me
>> think there is something wrong in our pipeline or in our cluster, and that
>> is what is making the checkpoint creation go crazy.)
>>
>> On Fri, Sep 14, 2018 at 8:00 PM, Julio Biason 
>> wrote:
>>
>>> Hey guys,
>>>
>>> On our pipeline, we have a single slot that it's taking longer to create
>>> the checkpoint compared to other slots and we are wondering what could be
>>> causing it.
>>>
>>> The operator in question is the window metric -- the only element in the
>>> pipeline that actually uses the state. While the other slots take 7 mins to
>>> create the checkpoint, this one -- and only this one -- takes 55mins.
>>>
>>> Is there something I should look at to understand what's going on?
>>>
>>> (We are storing all checkpoints in HDFS, in case that helps.)
>>>
>>> --
>>> *Julio Biason*, Sofware Engineer
>>> *AZION*  |  Deliver. Accelerate. Protect.
>>> Office: +55 51 3083 8101   |  Mobile: +55 51
>>> *99907 0554*
>>>
>>
>>
>>
>> --
>> *Julio Biason*, Sofware Engineer
>> *AZION*  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51
>> *99907 0554*
>>
> --
Liu, Renjie
Software Engineer, MVAD


Re: Task managers run on separate nodes in a cluster

2018-09-16 Thread Renjie Liu
Hi, Martin:
I think a better solution would be to set the number of cores of each
container equals to that of a physical server if this mesos cluster is
dedicated to your flink cluster.

On Mon, Sep 17, 2018 at 5:28 AM Martin Eden  wrote:

> Hi Till,
>
> I was able to use mesos.constraints.hard.hostattribute to run all task
> managers on a particular host in my cluster.
>
> However, after looking a bit at the code, I'm not sure we can use
> mesos.constraints.hard.hostattribute for load balancing Flink task managers
> evenly across hosts in a Mesos cluster.
>
> This is because under the hood it uses the fenzo host attribute value
> constraint while we would need the fenzo balanced host attribute constraint.
>
> The LaunchableMesosWorker sets the constraints via
> the com.netflix.fenzo.TaskRequest and all of these hard constraints must be
> satisfied by a host for the task scheduler to assign this task to that
> host. Since the current implementation always return the static constraint
> value configured i.e. what is after ":", see
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters#addHostAttrValueConstraint,
> I don't see how we can use it to load balance unless the constraint value
> would be dynamic based on the some property of the mesos task request.
>
> Am I correct in my assumptions?
>
> Any other way of load balancing?
> Maybe by not even using the DCOS Flink package (mesos flink framework) at
> all?
> Any plans to add support for the fenzo balanced host attribute constraint?
>
> Thanks,
> M
>
>
>
>
> On Fri, Sep 14, 2018 at 5:46 PM Till Rohrmann 
> wrote:
>
>> Hi Martin,
>>
>> Flink supports the mesos.constraints.hard.hostattribute to specify task
>> constraints based on agent attributes [1]. I think you could use them to
>> control the task placement.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-constraints-hard-hostattribute
>>
>> Cheers,
>> Till
>>
>> On Fri, Sep 14, 2018 at 3:08 PM Martin Eden 
>> wrote:
>>
>>> Thanks Vino!
>>>
>>> On Fri, Sep 14, 2018 at 3:37 AM vino yang  wrote:
>>>
 Hi Martin,

 Till has done most of the work of Flink on Mesos. Ping Till for you.

 Thanks, vino.

 Martin Eden  于2018年9月12日周三 下午11:21写道:

> Hi all,
>
> We're using Flink 1.3.2 with DCOS / Mesos.
>
> We have a 3 node cluster and are running the Flink DCOS package (Flink
> Mesos framework) configured with 3 Task Managers.
>
> Our goal is to run each of them on separate hosts for better load
> balancing but it seems the task managers end up running on the same host.
>
> Looked around the docs and DCOS Flink package but could not find any
> placement policy or anything of the sorts.
>
> Is there anything like that?
>
> We are also planning to upgrade to the latest Flink version. Is
> something like that supported in this newer version?
>
> Thanks,
> M
>
 --
Liu, Renjie
Software Engineer, MVAD


Re: Task managers run on separate nodes in a cluster

2018-09-16 Thread Till Rohrmann
Hi Martin,

I'm not aware that the community is actively working on enabling the
balanced host attribute constraint. If you wanna give it a try, then I'm
happy to review your contribution.

Cheers,
Till

On Mon, Sep 17, 2018 at 5:28 AM Renjie Liu  wrote:

> Hi, Martin:
> I think a better solution would be to set the number of cores of each
> container equals to that of a physical server if this mesos cluster is
> dedicated to your flink cluster.
>
> On Mon, Sep 17, 2018 at 5:28 AM Martin Eden 
> wrote:
>
>> Hi Till,
>>
>> I was able to use mesos.constraints.hard.hostattribute to run all task
>> managers on a particular host in my cluster.
>>
>> However, after looking a bit at the code, I'm not sure we can use
>> mesos.constraints.hard.hostattribute for load balancing Flink task managers
>> evenly across hosts in a Mesos cluster.
>>
>> This is because under the hood it uses the fenzo host attribute value
>> constraint while we would need the fenzo balanced host attribute constraint.
>>
>> The LaunchableMesosWorker sets the constraints via
>> the com.netflix.fenzo.TaskRequest and all of these hard constraints must be
>> satisfied by a host for the task scheduler to assign this task to that
>> host. Since the current implementation always return the static constraint
>> value configured i.e. what is after ":", see
>> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters#addHostAttrValueConstraint,
>> I don't see how we can use it to load balance unless the constraint value
>> would be dynamic based on the some property of the mesos task request.
>>
>> Am I correct in my assumptions?
>>
>> Any other way of load balancing?
>> Maybe by not even using the DCOS Flink package (mesos flink framework) at
>> all?
>> Any plans to add support for the fenzo balanced host attribute constraint?
>>
>> Thanks,
>> M
>>
>>
>>
>>
>> On Fri, Sep 14, 2018 at 5:46 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Martin,
>>>
>>> Flink supports the mesos.constraints.hard.hostattribute to specify task
>>> constraints based on agent attributes [1]. I think you could use them to
>>> control the task placement.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-constraints-hard-hostattribute
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Sep 14, 2018 at 3:08 PM Martin Eden 
>>> wrote:
>>>
 Thanks Vino!

 On Fri, Sep 14, 2018 at 3:37 AM vino yang 
 wrote:

> Hi Martin,
>
> Till has done most of the work of Flink on Mesos. Ping Till for you.
>
> Thanks, vino.
>
> Martin Eden  于2018年9月12日周三 下午11:21写道:
>
>> Hi all,
>>
>> We're using Flink 1.3.2 with DCOS / Mesos.
>>
>> We have a 3 node cluster and are running the Flink DCOS package
>> (Flink Mesos framework) configured with 3 Task Managers.
>>
>> Our goal is to run each of them on separate hosts for better load
>> balancing but it seems the task managers end up running on the same host.
>>
>> Looked around the docs and DCOS Flink package but could not find any
>> placement policy or anything of the sorts.
>>
>> Is there anything like that?
>>
>> We are also planning to upgrade to the latest Flink version. Is
>> something like that supported in this newer version?
>>
>> Thanks,
>> M
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>


Re: Task managers run on separate nodes in a cluster

2018-09-16 Thread Martin Eden
Thanks for the feedback Liu and Till.
@Liu Yeah that would work but unfortunately we run other services on the
cluster so it's not really an option.
@Till Will have a look and see how much time I can dedicate to this.
M

On Mon, Sep 17, 2018 at 7:21 AM Till Rohrmann  wrote:

> Hi Martin,
>
> I'm not aware that the community is actively working on enabling the
> balanced host attribute constraint. If you wanna give it a try, then I'm
> happy to review your contribution.
>
> Cheers,
> Till
>
> On Mon, Sep 17, 2018 at 5:28 AM Renjie Liu 
> wrote:
>
>> Hi, Martin:
>> I think a better solution would be to set the number of cores of each
>> container equals to that of a physical server if this mesos cluster is
>> dedicated to your flink cluster.
>>
>> On Mon, Sep 17, 2018 at 5:28 AM Martin Eden 
>> wrote:
>>
>>> Hi Till,
>>>
>>> I was able to use mesos.constraints.hard.hostattribute to run all task
>>> managers on a particular host in my cluster.
>>>
>>> However, after looking a bit at the code, I'm not sure we can use
>>> mesos.constraints.hard.hostattribute for load balancing Flink task managers
>>> evenly across hosts in a Mesos cluster.
>>>
>>> This is because under the hood it uses the fenzo host attribute value
>>> constraint while we would need the fenzo balanced host attribute constraint.
>>>
>>> The LaunchableMesosWorker sets the constraints via
>>> the com.netflix.fenzo.TaskRequest and all of these hard constraints must be
>>> satisfied by a host for the task scheduler to assign this task to that
>>> host. Since the current implementation always return the static constraint
>>> value configured i.e. what is after ":", see
>>> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters#addHostAttrValueConstraint,
>>> I don't see how we can use it to load balance unless the constraint value
>>> would be dynamic based on the some property of the mesos task request.
>>>
>>> Am I correct in my assumptions?
>>>
>>> Any other way of load balancing?
>>> Maybe by not even using the DCOS Flink package (mesos flink framework)
>>> at all?
>>> Any plans to add support for the fenzo balanced host attribute
>>> constraint?
>>>
>>> Thanks,
>>> M
>>>
>>>
>>>
>>>
>>> On Fri, Sep 14, 2018 at 5:46 PM Till Rohrmann 
>>> wrote:
>>>
 Hi Martin,

 Flink supports the mesos.constraints.hard.hostattribute to specify task
 constraints based on agent attributes [1]. I think you could use them to
 control the task placement.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-constraints-hard-hostattribute

 Cheers,
 Till

 On Fri, Sep 14, 2018 at 3:08 PM Martin Eden 
 wrote:

> Thanks Vino!
>
> On Fri, Sep 14, 2018 at 3:37 AM vino yang 
> wrote:
>
>> Hi Martin,
>>
>> Till has done most of the work of Flink on Mesos. Ping Till for you.
>>
>> Thanks, vino.
>>
>> Martin Eden  于2018年9月12日周三 下午11:21写道:
>>
>>> Hi all,
>>>
>>> We're using Flink 1.3.2 with DCOS / Mesos.
>>>
>>> We have a 3 node cluster and are running the Flink DCOS package
>>> (Flink Mesos framework) configured with 3 Task Managers.
>>>
>>> Our goal is to run each of them on separate hosts for better load
>>> balancing but it seems the task managers end up running on the same 
>>> host.
>>>
>>> Looked around the docs and DCOS Flink package but could not find any
>>> placement policy or anything of the sorts.
>>>
>>> Is there anything like that?
>>>
>>> We are also planning to upgrade to the latest Flink version. Is
>>> something like that supported in this newer version?
>>>
>>> Thanks,
>>> M
>>>
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>


Re: Question regarding state cleaning using timer

2018-09-16 Thread Vijay Bhaskar
Thanks Hequn. But i want to give random TTL for each partitioned key. How
can i achieve it?

Regards
Bhaskar

On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng  wrote:

> Hi bhaskar,
>
> You need change nothing if you want to handle multi keys. Flink will do it
> for you. The ValueState is a keyed state. You can think of Keyed State
> 
> as Operator State that has been partitioned, or sharded, with exactly one
> state-partition per key.
> TTL can be used in the same way.
>
> Best, Hequn
>
>
> On Fri, Sep 14, 2018 at 10:29 PM bhaskar.eba...@gmail.com <
> bhaskar.eba...@gmail.com> wrote:
>
>> Hi
>> In the following example given in flink:
>> object ExampleCountWindowAverage extends App {
>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>>   env.fromCollection(List(
>> (1L, 3L),
>> (1L, 5L),
>> (1L, 7L),
>> (1L, 4L),
>> (1L, 2L)
>>   )).keyBy(_._1)
>> .flatMap(new CountWindowAverage())
>> .print()
>>   // the printed output will be (1,4) and (1,5)
>>
>>   env.execute("ExampleManagedState")
>> }
>>
>> There is only 1 state because there is one key. In the CountWindowAverage
>> method there is one state descriptor :  new ValueStateDescriptor[(Long,
>> Long)]("average", createTypeInformation[(Long, Long)])
>> Name given as "average". In order to implement this is generic way, shall
>> i modify the  method:
>>
>> CountWindowAverage(keyName:String)  so that  new
>> ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long,
>> Long)]) is created. But how to configure TTL for this? Inside this method?
>> In the eample:
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>> ,   you have given a stand alone ValueStateDescriptor.  How can i use the
>> TTL inside CountWindowAverage() per Key?
>>
>> Regards
>> Bhaskar
>>
>