Re: TimeWindow overload?

2016-05-03 Thread Stephan Ewen
Hi Elias!

There is a feature pending that uses an optimized version for aligned time
windows. In that case, elements would go into a single window pane, and the
full window would be composed of all panes it spans (in the case of sliding
windows). That should help a lot in those cases.

The default window mechanism does it that way, because is supports
unaligned windows (where each key has a different window start and
endpoint) and it supports completely custom window assigners.

Greetings,
Stephan



On Tue, May 3, 2016 at 4:07 AM, Elias Levy 
wrote:

> Looking over the code, I see that Flink creates a TimeWindow object each
> time the WindowAssigner is created.  I have not yet tested this, but I am
> wondering if this can become problematic if you have a very long sliding
> window with a small slide, such as a 24 hour window with a 1 minute slide.
> It seems this would create 1,440 TimeWindow objects per event.  Event a low
> event rates this would seem to result in an explosion of TimeWindow
> objects: at 1,000 events per second, you'd be creating 1,440,000 TImeWindow
> objects.  After 24 hours you'd have nearly 125 billion TM objects that
> would just begin to be purged.
>
> Does this analysis seem right?
>
> I suppose that means you should not use long length sliding window with
> small slides.
>
>


Sink - writeAsText problem

2016-05-03 Thread Punit Naik
Hello

I executed my Flink code in eclipse and it properly generated the output by
creating a folder (as specified in the string) and placing output files in
them.

But when I exported the project as JAR and ran the same code using ./flink
run, it generated the output, but instead of creating a folder with files
in it, it just created a single file (as specified in the string) (output
was correct though).

Why does this happen? I want Flink to write its output in folder.

-- 
Thank You

Regards

Punit Naik


Re: Sink - writeAsText problem

2016-05-03 Thread Fabian Hueske
Did you specify a parallelism? The default parallelism of a Flink instance
is 1 [1].
You can set a different default parallelism in ./conf/flink-conf.yaml or
pass a job specific parallelism with ./bin/flink using  the -p flag [2].
More options to define parallelism are in the docs [3].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#common-options
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#parallel-execution

2016-05-03 9:26 GMT+02:00 Punit Naik :

> Hello
>
> I executed my Flink code in eclipse and it properly generated the output
> by creating a folder (as specified in the string) and placing output files
> in them.
>
> But when I exported the project as JAR and ran the same code using ./flink
> run, it generated the output, but instead of creating a folder with files
> in it, it just created a single file (as specified in the string) (output
> was correct though).
>
> Why does this happen? I want Flink to write its output in folder.
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: Measuring latency in a DataStream

2016-05-03 Thread Robert Schmidtke
Hi Igor, thanks for your reply.

As for your first point I'm not sure I understand correctly. I'm ingesting
records at a rate of about 50k records per second, and those records are
fairly small. If I add a time stamp to each of them, I will have a lot more
data, which is not exactly what I want. Instead I wanted to add something
like a watermark once every second and only have a time stamp on this one,
and calculate the latency from it.

For your second point, in fact the clocks are up to 8s apart -.-" not sure
how I missed this yesterday. as I'm not an admin of the machine I will
request ntp to be set up.

Thanks!
Robert



On Mon, May 2, 2016 at 10:19 PM, Igor Berman  wrote:

> 1. why are you doing join instead of something like
> System.currentTimeInMillis()? at the end you have tuple of your data with
> timestamp anyways...so why just not to wrap you data in tuple2 with
> additional info of creation ts?
>
> 2. are you sure that consumer/producer machines' clocks are in sync?
> you can use ntp for this.
>
> On 2 May 2016 at 20:02, Robert Schmidtke  wrote:
>
>> Hi everyone,
>>
>> I have implemented a way to measure latency in a DataStream (I hope): I'm
>> consuming a Kafka topic and I'm union'ing the resulting stream with a
>> custom source that emits a (machine-local) timestamp every 1000ms (using
>> currentTimeMillis). On the consuming end I'm distinguishing between the
>> Kafka events and the timestamps. When encountering a timestamp, I take the
>> difference of the processing machine's local time and the timestamp found
>> in the stream, expecting a positive difference (with the processing
>> machine's timestamp being larger than the timestamp found in the stream).
>> However, the opposite is the case. Now I am wondering about when events are
>> actually processed.
>>
>> Union the Stream from Kafka+my custom source, batching them in 10s
>> windows (which is what I do), I expect 10 timestamps with ascending values
>> and a rough gap of 1000ms in the stream:
>>
>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>>
>> On the receiving end I again take the currentTimeMillis in my fold
>> function, expecting the resulting value to be larger (most of the time)
>> than the timestamps encountered in the stream:
>>
>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>>
>> The system clocks are in sync up to 1ms.
>>
>> Maybe I am not clear about when certain timestamps are created (i.e. when
>> the UDFs are invoked) or how windows are processed. Any advice is greatly
>> appreciated, also alternative approaches to calculating latency.
>>
>> I'm on Flink 0.10.2 by the way.
>>
>> Thanks in advance for the help!
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680


Re: How to perform this join operation?

2016-05-03 Thread Aljoscha Krettek
Hi Elias,
thanks for the long write-up. It's interesting that it actually kinda works
right now.

You might be interested in a design doc that we're currently working on. I
posted it on the dev list but here it is:
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing
I'm
trying to add support for side inputs. They are excellent for the use case
where you want to enrich (join) a main stream with one or several other
streams. This would also include support for different windows on the
different streams and a mechanism for mapping main windows to the correct
side-input windows.

Feedback/suggestions are very welcome on this!

Cheers,
Aljoscha



On Tue, 3 May 2016 at 03:06 Elias Levy  wrote:

> Thanks for the suggestion.  I ended up implementing it a different way.
>
> What is needed is a mechanism to give each stream a different window
> assigner, and then let Flink perform the join normally given the assigned
> windows.
>
> Specifically, for my use case what I need is a sliding window for one
> stream and a trailing window for the other stream.  A trailing window is
> just a TimeWindow where the window end time is the event time, rounded up
> or down some amount, and the window start time is is end time minus some
> given parameter.
>
> For instance:
>
> class TrailingEventTimeWindows(asize: Long, around: Long) extends
> WindowAssigner[Object, TimeWindow] {
>   val size  = asize
>   val round = around
>
>   override def assignWindows(element: Object, timestamp: Long):
> Collection[TimeWindow] = {
> if (timestamp > java.lang.Long.MIN_VALUE) {
>   val end = (timestamp - (timestamp % round)) + round
>   Collections.singletonList(new TimeWindow(end - size, end))
> } else {
>   // Long.MIN_VALUE is currently assigned when no timestamp is present
>   throw new RuntimeException("Record has Long.MIN_VALUE timestamp (=
> no timestamp marker). " +
>   "Is the time characteristic set to 'ProcessingTime', or did you
> forget to call " +
>   "'DataStream.assignTimestampsAndWatermarks(...)'?")
> }
>   }
>
>   def getSize: Long = size
>
>   override def getDefaultTrigger(env: JStreamExecutionEnvironment):
> Trigger[Object, TimeWindow] = EventTimeTrigger.create()
>
>   override def toString: String = s"TrailingEventTimeWindows($size)"
>
>   override def getWindowSerializer(executionConfig: ExecutionConfig):
> TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
> }
>
> object TrailingEventTimeWindows {
>   def of(size: Time, round: Time) = new
> TrailingEventTimeWindows(size.toMilliseconds(), round.toMilliseconds())
> }
>
>
>
> If the Flink API where different, then I could do something like this to
> join the streams:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> // (time, key,  id)
> val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100,
> 10), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) )
> // (time, file)
> val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300),
> (4000, 100) )
>
> val windowedKeyedChanges = changes
>   .assignAscendingTimestamps( _._1 )
>   .keyBy(1)
>   .window(TrailingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
>
> val windowedKeyedEvents =
>   events.assignAscendingTimestamps( _._1 )
> .keyBy(2)
> .timeWindow(Time.seconds(5), Time.seconds(1))
>
> val results = windowedKeyedEvents.join(windowedKeyedChanges)
>   .apply { }
>
>
> Alas, the Flink API makes this more complicated.  Instead of allowing you
> to joined to keyed windowed streams, you join two unkeyed unwind owed
> streams and tell it how to key them and join them using
> join().where().equalTo().window().  Since that construct only takes a
> single WindowAssigner I created a window assigner that uses a different
> assigner for each stream being joined:
>
> class DualWindowAssigner[T1 <: Object, T2 <: Object](assigner1:
> WindowAssigner[Object, TimeWindow], assigner2: WindowAssigner[Object,
> TimeWindow]) extends WindowAssigner[Object, TimeWindow] {
>   val windowAssigner1 = assigner1
>   val windowAssigner2 = assigner2
>
>   override def assignWindows(element: Object, timestamp: Long):
> Collection[TimeWindow] = {
> val e = element.asInstanceOf[TaggedUnion[T1,T2]]
> if (e.isOne) {
>   windowAssigner1.assignWindows(e.getOne, timestamp)
> } else {
>   windowAssigner2.assignWindows(e.getTwo, timestamp)
> }
>   }
>
>   override def getDefaultTrigger(env: JStreamExecutionEnvironment):
> Trigger[Object, TimeWindow] = EventTimeTrigger.create()
>
>   override def toString: String = s"DualWindowAssigner"
>
>   override def getWindowSerializer(executionConfig: ExecutionConfig):
> TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
> }
>
>
> Then I can do:
>
> val env = StreamExecutionEnvironment.getExecutionEn

Re: Sink - writeAsText problem

2016-05-03 Thread Stephan Ewen
Hi!

There is the option to always create a directory:
"fs.output.always-create-directory"

See
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#file-systems

Greetings,
Stephan


On Tue, May 3, 2016 at 9:26 AM, Punit Naik  wrote:

> Hello
>
> I executed my Flink code in eclipse and it properly generated the output
> by creating a folder (as specified in the string) and placing output files
> in them.
>
> But when I exported the project as JAR and ran the same code using ./flink
> run, it generated the output, but instead of creating a folder with files
> in it, it just created a single file (as specified in the string) (output
> was correct though).
>
> Why does this happen? I want Flink to write its output in folder.
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: Scala compilation error

2016-05-03 Thread Aljoscha Krettek
There is a Scaladoc but it is not covering all packages, unfortunately. In
the Scala API you can call transform without specifying a TypeInformation,
it works using implicits/context bounds.

On Tue, 3 May 2016 at 01:48 Srikanth  wrote:

> Sorry for the previous incomplete email. Didn't realize I hit send!
>
> I was facing a weird compilation error in Scala when I did
> val joinedStream = stream1.connect(stream2)
> .transform("funName", outTypeInfo, joinOperator)
>
> It turned out to be due to a difference in API signature between Scala and
> Java API. I was refering to javadoc. Is there a scaladoc?
>
> Java API has
> public  SingleOutputStreamOperator transform(
> String functionName,
> TypeInformation outTypeInfo,
> TwoInputStreamOperator operator)
>
> Scala API has
> def transform[R: TypeInformation](
>   functionName: String,
>   operator: TwoInputStreamOperator[IN1, IN2, R])
>
> Srikanth
>
> On Mon, May 2, 2016 at 7:18 PM, Srikanth  wrote:
>
>> Hello,
>>
>> I'm fac
>>
>> val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
>> new SimpleStringSchema(), properties))
>> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
>> BidderRawLogs(b)).keyBy(b => b.strategyId)
>>
>> val metaStrategy: KeyedStream[(Int, String), Int] =
>> env.readTextFile("path").name("Strategy")
>>  .map((1, _) ).keyBy(_._1)
>>
>> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
>> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
>> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
>> (Int, BidderRawLogs, (Int, String))] =
>>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
>> staticTypeInfo)
>> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
>> {}.getTypeInfo()
>>
>> val funName = "test"
>> val joinedStream = bidderStream.connect(metaStrategy)
>> .transform(funName, joinOperator, outTypeInfo)
>>
>>
>


Re: Sink - writeAsText problem

2016-05-03 Thread Punit Naik
Hi Stephen, Fabian

setting "fs.output.always-create-directory" to true in flink-config.yml
worked!

On Tue, May 3, 2016 at 2:27 PM, Stephan Ewen  wrote:

> Hi!
>
> There is the option to always create a directory:
> "fs.output.always-create-directory"
>
> See
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#file-systems
>
> Greetings,
> Stephan
>
>
> On Tue, May 3, 2016 at 9:26 AM, Punit Naik  wrote:
>
>> Hello
>>
>> I executed my Flink code in eclipse and it properly generated the output
>> by creating a folder (as specified in the string) and placing output files
>> in them.
>>
>> But when I exported the project as JAR and ran the same code using
>> ./flink run, it generated the output, but instead of creating a folder with
>> files in it, it just created a single file (as specified in the string)
>> (output was correct though).
>>
>> Why does this happen? I want Flink to write its output in folder.
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


-- 
Thank You

Regards

Punit Naik


Re: Sink - writeAsText problem

2016-05-03 Thread Fabian Hueske
Yes, but be aware that your program runs with parallelism 1 if you do not
configure the parallelism.

2016-05-03 11:07 GMT+02:00 Punit Naik :

> Hi Stephen, Fabian
>
> setting "fs.output.always-create-directory" to true in flink-config.yml
> worked!
>
> On Tue, May 3, 2016 at 2:27 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> There is the option to always create a directory:
>> "fs.output.always-create-directory"
>>
>> See
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#file-systems
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, May 3, 2016 at 9:26 AM, Punit Naik 
>> wrote:
>>
>>> Hello
>>>
>>> I executed my Flink code in eclipse and it properly generated the output
>>> by creating a folder (as specified in the string) and placing output files
>>> in them.
>>>
>>> But when I exported the project as JAR and ran the same code using
>>> ./flink run, it generated the output, but instead of creating a folder with
>>> files in it, it just created a single file (as specified in the string)
>>> (output was correct though).
>>>
>>> Why does this happen? I want Flink to write its output in folder.
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: TimeWindow overload?

2016-05-03 Thread Aljoscha Krettek
Hi,
even with the optimized operator for aligned time windows I would advice
against using long sliding windows with a small slide. The system will
internally create a lot of "buckets", i.e. each sliding window is treated
separately and the element is put into 1,440 buckets, in your case. With a
moderate amount of different keys this can very quickly lead to a lot of
created window buckets. You can think of it in terms of write
amplification. If you have tumbling windows you basically have no
amplification, if you have sliding windows you have window processing
overhead for every slide.

Cheers,
Aljoscha

On Tue, 3 May 2016 at 09:05 Stephan Ewen  wrote:

> Hi Elias!
>
> There is a feature pending that uses an optimized version for aligned time
> windows. In that case, elements would go into a single window pane, and the
> full window would be composed of all panes it spans (in the case of sliding
> windows). That should help a lot in those cases.
>
> The default window mechanism does it that way, because is supports
> unaligned windows (where each key has a different window start and
> endpoint) and it supports completely custom window assigners.
>
> Greetings,
> Stephan
>
>
>
> On Tue, May 3, 2016 at 4:07 AM, Elias Levy 
> wrote:
>
>> Looking over the code, I see that Flink creates a TimeWindow object each
>> time the WindowAssigner is created.  I have not yet tested this, but I am
>> wondering if this can become problematic if you have a very long sliding
>> window with a small slide, such as a 24 hour window with a 1 minute slide.
>> It seems this would create 1,440 TimeWindow objects per event.  Event a low
>> event rates this would seem to result in an explosion of TimeWindow
>> objects: at 1,000 events per second, you'd be creating 1,440,000 TImeWindow
>> objects.  After 24 hours you'd have nearly 125 billion TM objects that
>> would just begin to be purged.
>>
>> Does this analysis seem right?
>>
>> I suppose that means you should not use long length sliding window with
>> small slides.
>>
>>
>


Re: Insufficient number of network buffers

2016-05-03 Thread Ufuk Celebi
Hey Tarandeep,

I think the failures are unrelated. Regarding the number of network
buffers: 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-the-network-buffers

The timeouts might occur, because the task managers are pretty loaded.
I would suggest to increase the Akka ask timeouts via
akka.ask.timeout: 100 s
(https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#distributed-coordination-via-akka)

– Ufuk

On Tue, May 3, 2016 at 6:40 AM, Tarandeep Singh  wrote:
> Hi,
>
> I have written ETL jobs in Flink (DataSet API). When I execute them in IDE,
> they run and finish fine. When I try to run them on my cluster, I get
> "Insufficient number of network buffers" error.
>
> I have 5 machines in my cluster with 4 cores each. TaskManager is given 3GB
> each. I increased the number of buffers to 5000, but got the same error.
> When I increased it further (say 7500), I get  exception listed below.
>
> The DAG or execution plan is pretty big. What is recommended way to run your
> jobs when the DAG becomes huge? Shall I break it into parts by calling
> execute on execution environment in between jobs ?
>
> Thanks,
> Tarandeep
>
> Exception I got after I tried to run with 7500 buffers:
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Update task on instance
> d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL:
> akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to:
> at
> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
> at akka.dispatch.OnFailure.internal(Future.scala:228)
> at akka.dispatch.OnFailure.internal(Future.scala:227)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> ... 2 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999]]
> after [1 ms]
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
>
>
>


How to perform Broadcast and groupBy in DataStream like DataSet

2016-05-03 Thread subash basnet
Hello all,

How could we perform *withBroadcastSet* and *groupBy* in DataStream like
that of DataSet in the below KMeans code:

DataSet newCentroids = points
.map(new SelectNearestCenter()).*withBroadcastSet*(loop, "centroids")
.map(new CountAppender()).*groupBy*(0).reduce(new CentroidAccumulator())
.map(new CentroidAverager());


DataStream newCentroids = points.map(new
SelectNearestCenter()).???


Best Regards,
Subash Basnet


Re: Creating a custom operator

2016-05-03 Thread Simone Robutti
Hello Fabian,

we delved more moving from the input you gave us but a question arised. We
always assumed that runtime operators were open for extension without
modifying anything inside Flink but it looks like this is not the case and
the documentation assumes that the developer is working to a contribution
to Flink. So I would like to know if our understandment is correct and
custom runtime operators are not supposed to be implemented outside of
Flink.

Thanks,

Simone

2016-04-29 21:32 GMT+02:00 Fabian Hueske :

> Hi Simone,
>
> the GraphCreatingVisitor transforms the common operator plan into a
> representation that is translated by the optimizer.
> You have to implement an OptimizerNode and OperatorDescriptor to describe
> the operator.
> Depending on the semantics of the operator, there are a few more places to
> make the integration working like driver strategies, cost model, etc.
>
> I would recommend to have a look at previous changes that added an
> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
> The respective commits should give you an idea which parts of the code
> need to be touched. You should find the commit IDs in the JIRA issues for
> these extensions.
>
> Cheers, Fabian
>
>
>
>
>
> 2016-04-29 15:32 GMT+02:00 Simone Robutti :
>
>> Hello,
>>
>> I'm trying to create a custom operator to explore the internals of Flink.
>> Actually the one I'm working on is rather similar to Union and I'm trying
>> to mimick it for now. When I run my job though, this error arise:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>> operator type: MyOperator - My Operator
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>> at
>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>> at
>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>> at
>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>> at
>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>> at
>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>
>> I looked at the location of the error but it's not clear to me how to
>> make my operator recognizable from the optimizer.
>>
>> Thank,
>>
>> Simone
>>
>
>


Re: Sink - writeAsText problem

2016-05-03 Thread Punit Naik
Yeah thanks for letting me know.
On 03-May-2016 2:40 PM, "Fabian Hueske"  wrote:

> Yes, but be aware that your program runs with parallelism 1 if you do not
> configure the parallelism.
>
> 2016-05-03 11:07 GMT+02:00 Punit Naik :
>
>> Hi Stephen, Fabian
>>
>> setting "fs.output.always-create-directory" to true in flink-config.yml
>> worked!
>>
>> On Tue, May 3, 2016 at 2:27 PM, Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> There is the option to always create a directory:
>>> "fs.output.always-create-directory"
>>>
>>> See
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#file-systems
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, May 3, 2016 at 9:26 AM, Punit Naik 
>>> wrote:
>>>
 Hello

 I executed my Flink code in eclipse and it properly generated the
 output by creating a folder (as specified in the string) and placing output
 files in them.

 But when I exported the project as JAR and ran the same code using
 ./flink run, it generated the output, but instead of creating a folder with
 files in it, it just created a single file (as specified in the string)
 (output was correct though).

 Why does this happen? I want Flink to write its output in folder.

 --
 Thank You

 Regards

 Punit Naik

>>>
>>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


Re: Creating a custom operator

2016-05-03 Thread Fabian Hueske
Hi Simone,

you are right, the interfaces you extend are not considered to be public,
user-facing API.
Adding custom operators to the DataSet API touches many parts of the system
and is not straightforward.
The DataStream API has better support for custom operators.

Can you explain what kind of operator you would like to add?
Maybe the functionality can be achieved with the existing operators.

Best, Fabian

2016-05-03 12:54 GMT+02:00 Simone Robutti :

> Hello Fabian,
>
> we delved more moving from the input you gave us but a question arised. We
> always assumed that runtime operators were open for extension without
> modifying anything inside Flink but it looks like this is not the case and
> the documentation assumes that the developer is working to a contribution
> to Flink. So I would like to know if our understandment is correct and
> custom runtime operators are not supposed to be implemented outside of
> Flink.
>
> Thanks,
>
> Simone
>
> 2016-04-29 21:32 GMT+02:00 Fabian Hueske :
>
>> Hi Simone,
>>
>> the GraphCreatingVisitor transforms the common operator plan into a
>> representation that is translated by the optimizer.
>> You have to implement an OptimizerNode and OperatorDescriptor to describe
>> the operator.
>> Depending on the semantics of the operator, there are a few more places
>> to make the integration working like driver strategies, cost model, etc.
>>
>> I would recommend to have a look at previous changes that added an
>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
>> The respective commits should give you an idea which parts of the code
>> need to be touched. You should find the commit IDs in the JIRA issues for
>> these extensions.
>>
>> Cheers, Fabian
>>
>>
>>
>>
>>
>> 2016-04-29 15:32 GMT+02:00 Simone Robutti :
>>
>>> Hello,
>>>
>>> I'm trying to create a custom operator to explore the internals of
>>> Flink. Actually the one I'm working on is rather similar to Union and I'm
>>> trying to mimick it for now. When I run my job though, this error arise:
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>>> operator type: MyOperator - My Operator
>>> at
>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>>> at
>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>>> at
>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>>> at
>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>> at
>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>>> at
>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>
>>> I looked at the location of the error but it's not clear to me how to
>>> make my operator recognizable from the optimizer.
>>>
>>> Thank,
>>>
>>> Simone
>>>
>>
>>
>


Re: Measuring latency in a DataStream

2016-05-03 Thread Robert Schmidtke
After fixing the clock issue on the application level, the latency is as
expected. Thanks again!

Robert

On Tue, May 3, 2016 at 9:54 AM, Robert Schmidtke 
wrote:

> Hi Igor, thanks for your reply.
>
> As for your first point I'm not sure I understand correctly. I'm ingesting
> records at a rate of about 50k records per second, and those records are
> fairly small. If I add a time stamp to each of them, I will have a lot more
> data, which is not exactly what I want. Instead I wanted to add something
> like a watermark once every second and only have a time stamp on this one,
> and calculate the latency from it.
>
> For your second point, in fact the clocks are up to 8s apart -.-" not sure
> how I missed this yesterday. as I'm not an admin of the machine I will
> request ntp to be set up.
>
> Thanks!
> Robert
>
>
>
> On Mon, May 2, 2016 at 10:19 PM, Igor Berman 
> wrote:
>
>> 1. why are you doing join instead of something like
>> System.currentTimeInMillis()? at the end you have tuple of your data with
>> timestamp anyways...so why just not to wrap you data in tuple2 with
>> additional info of creation ts?
>>
>> 2. are you sure that consumer/producer machines' clocks are in sync?
>> you can use ntp for this.
>>
>> On 2 May 2016 at 20:02, Robert Schmidtke  wrote:
>>
>>> Hi everyone,
>>>
>>> I have implemented a way to measure latency in a DataStream (I hope):
>>> I'm consuming a Kafka topic and I'm union'ing the resulting stream with a
>>> custom source that emits a (machine-local) timestamp every 1000ms (using
>>> currentTimeMillis). On the consuming end I'm distinguishing between the
>>> Kafka events and the timestamps. When encountering a timestamp, I take the
>>> difference of the processing machine's local time and the timestamp found
>>> in the stream, expecting a positive difference (with the processing
>>> machine's timestamp being larger than the timestamp found in the stream).
>>> However, the opposite is the case. Now I am wondering about when events are
>>> actually processed.
>>>
>>> Union the Stream from Kafka+my custom source, batching them in 10s
>>> windows (which is what I do), I expect 10 timestamps with ascending values
>>> and a rough gap of 1000ms in the stream:
>>>
>>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>>>
>>> On the receiving end I again take the currentTimeMillis in my fold
>>> function, expecting the resulting value to be larger (most of the time)
>>> than the timestamps encountered in the stream:
>>>
>>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>>>
>>> The system clocks are in sync up to 1ms.
>>>
>>> Maybe I am not clear about when certain timestamps are created (i.e.
>>> when the UDFs are invoked) or how windows are processed. Any advice is
>>> greatly appreciated, also alternative approaches to calculating latency.
>>>
>>> I'm on Flink 0.10.2 by the way.
>>>
>>> Thanks in advance for the help!
>>>
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680


Re: TimeWindow overload?

2016-05-03 Thread Stephan Ewen
Just had a quick chat with Aljoscha...

The first version of the aligned window code will still duplicate the
elements, later versions should be able to get rid of that.

On Tue, May 3, 2016 at 11:10 AM, Aljoscha Krettek 
wrote:

> Hi,
> even with the optimized operator for aligned time windows I would advice
> against using long sliding windows with a small slide. The system will
> internally create a lot of "buckets", i.e. each sliding window is treated
> separately and the element is put into 1,440 buckets, in your case. With a
> moderate amount of different keys this can very quickly lead to a lot of
> created window buckets. You can think of it in terms of write
> amplification. If you have tumbling windows you basically have no
> amplification, if you have sliding windows you have window processing
> overhead for every slide.
>
> Cheers,
> Aljoscha
>
> On Tue, 3 May 2016 at 09:05 Stephan Ewen  wrote:
>
>> Hi Elias!
>>
>> There is a feature pending that uses an optimized version for aligned
>> time windows. In that case, elements would go into a single window pane,
>> and the full window would be composed of all panes it spans (in the case of
>> sliding windows). That should help a lot in those cases.
>>
>> The default window mechanism does it that way, because is supports
>> unaligned windows (where each key has a different window start and
>> endpoint) and it supports completely custom window assigners.
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Tue, May 3, 2016 at 4:07 AM, Elias Levy 
>> wrote:
>>
>>> Looking over the code, I see that Flink creates a TimeWindow object each
>>> time the WindowAssigner is created.  I have not yet tested this, but I am
>>> wondering if this can become problematic if you have a very long sliding
>>> window with a small slide, such as a 24 hour window with a 1 minute slide.
>>> It seems this would create 1,440 TimeWindow objects per event.  Event a low
>>> event rates this would seem to result in an explosion of TimeWindow
>>> objects: at 1,000 events per second, you'd be creating 1,440,000 TImeWindow
>>> objects.  After 24 hours you'd have nearly 125 billion TM objects that
>>> would just begin to be purged.
>>>
>>> Does this analysis seem right?
>>>
>>> I suppose that means you should not use long length sliding window with
>>> small slides.
>>>
>>>
>>


Re: Flink Iterations Ordering

2016-05-03 Thread Stephan Ewen
Hi!

The order in which the elements arrive in an iteration HEAD is the order in
which the last operator in the loop (the TAIL) produces them. If that is a
deterministic ordering (because of a sorted reduce, for example), then you
should be able to rely on the order.

Otherwise, the order of elements can always change every time elements are
re-distributed across tasks (for example during a key-based shuffle, a
broadcast, or a change of parallelism).

Greetings,
Stephan


On Mon, May 2, 2016 at 2:50 PM, Aljoscha Krettek 
wrote:

> Hi,
> as I understand it the order of elements will not be preserved across
> iteration supersets. But maybe some-one else knows more.
>
> Cheers,
> Aljoscha
>
> On Thu, 28 Apr 2016 at 00:23 David Kim 
> wrote:
>
>> Hello all,
>>
>> I read the documentation at [1] on iterations and had a question on
>> whether an assumption is safe to make.
>>
>> As partial solutions are continuously looping through the step function,
>> when new elements are added as iteration inputs will the insertion order of
>> all of the elements be preserved?
>>
>> Example:
>>
>> Current partial solutions going through step function are: A, B, C.
>>
>> At a later time we add (in order) elements D, E.
>>
>> Will the iteration result always be A,B,C,D,E?
>>
>> References:
>>
>> [1] 
>> *https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html
>> *
>>
>


Flink - start-cluster.sh

2016-05-03 Thread Punit Naik
Hi

I did all the settings required for cluster setup. but when I ran the
start-cluster.sh script, it only started one jobmanager on the master node.
Logs are written only on the master node. Slaves don't have any logs. And
when I ran a program it said:

Resources available to scheduler: Number of instances=0, total number of
slots=0, available slots=0

Can anyone help please?

-- 
Thank You

Regards

Punit Naik


Re: Creating a custom operator

2016-05-03 Thread Simone Robutti
I'm not sure this is the right way to do it but we were exploring all the
possibilities and this one is the more obvious. We also spent some time to
study how to do it to achieve a better understanding of Flink's internals.

What we want to do though is to integrate Flink with another distributed
system that builds its own nodes and coordinates through the network with
its own logic. This software is H2O (a Machine Learning platform) and the
integration consists of two big tasks: the first is to instantiate a H2O's
node in every task manager and handle the lifecycle of the node according
to the taskmanager and the execution graph. The second is to allow the
developer to code everything inside Flink, converting from and to H2O's
data structures (distributed tabular data) and triggering the execution of
algorithms on H2O with a uniform API.

Here's a simple example (assuming that we will use the TableAPI):

val env = ExecutionEnvironment.getExecutionEnvironment
val h2oEnv = H2OEnviroment.getEnvironment(env)

val myData: Table = ...
val someOtherData: Table = ...

val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)

val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)

val predictions:Table=linearRegressionModel(someOtherData)

predictions.select(...)


A good solution should allow the system to keep the H2O's nodes alive
through multiple tasks and the possibility to move the data locally from
Flink to H2O. The latter is not achieved in H2O's integration with Spark
but we still hope to do it.

That said, I'm still not sure if it is really required to implement a
custom runtime operator but given the complexity of the integration of two
distribute systems, we assumed that more control would allow more
flexibility and possibilities to achieve an ideal solution.





2016-05-03 13:29 GMT+02:00 Fabian Hueske :

> Hi Simone,
>
> you are right, the interfaces you extend are not considered to be public,
> user-facing API.
> Adding custom operators to the DataSet API touches many parts of the
> system and is not straightforward.
> The DataStream API has better support for custom operators.
>
> Can you explain what kind of operator you would like to add?
> Maybe the functionality can be achieved with the existing operators.
>
> Best, Fabian
>
> 2016-05-03 12:54 GMT+02:00 Simone Robutti :
>
>> Hello Fabian,
>>
>> we delved more moving from the input you gave us but a question arised.
>> We always assumed that runtime operators were open for extension without
>> modifying anything inside Flink but it looks like this is not the case and
>> the documentation assumes that the developer is working to a contribution
>> to Flink. So I would like to know if our understandment is correct and
>> custom runtime operators are not supposed to be implemented outside of
>> Flink.
>>
>> Thanks,
>>
>> Simone
>>
>> 2016-04-29 21:32 GMT+02:00 Fabian Hueske :
>>
>>> Hi Simone,
>>>
>>> the GraphCreatingVisitor transforms the common operator plan into a
>>> representation that is translated by the optimizer.
>>> You have to implement an OptimizerNode and OperatorDescriptor to
>>> describe the operator.
>>> Depending on the semantics of the operator, there are a few more places
>>> to make the integration working like driver strategies, cost model, etc.
>>>
>>> I would recommend to have a look at previous changes that added an
>>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
>>> The respective commits should give you an idea which parts of the code
>>> need to be touched. You should find the commit IDs in the JIRA issues for
>>> these extensions.
>>>
>>> Cheers, Fabian
>>>
>>>
>>>
>>>
>>>
>>> 2016-04-29 15:32 GMT+02:00 Simone Robutti 
>>> :
>>>
 Hello,

 I'm trying to create a custom operator to explore the internals of
 Flink. Actually the one I'm working on is rather similar to Union and I'm
 trying to mimick it for now. When I run my job though, this error arise:

 Exception in thread "main" java.lang.IllegalArgumentException: Unknown
 operator type: MyOperator - My Operator
 at
 org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
 at
 org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
 at
 org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
 at
 org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
 at org.apache.flink.api.common.Plan.accept(Plan.java:348)
 at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
 at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
 at
 org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
 at
 org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
 at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
 at i

Re: Insufficient number of network buffers

2016-05-03 Thread Tarandeep Singh
Yes, you are right, the exception was caused as task managers were heavily
loaded. I checked ganglia metrics and CPU usage was very high. I reduced
parallelism and ran with 5000 buffers and didn't get any exception.

Thanks,
Tarandeep

On Tue, May 3, 2016 at 2:19 AM, Ufuk Celebi  wrote:

> Hey Tarandeep,
>
> I think the failures are unrelated. Regarding the number of network
> buffers:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#configuring-the-network-buffers
>
> The timeouts might occur, because the task managers are pretty loaded.
> I would suggest to increase the Akka ask timeouts via
> akka.ask.timeout: 100 s
> (
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#distributed-coordination-via-akka
> )
>
> – Ufuk
>
> On Tue, May 3, 2016 at 6:40 AM, Tarandeep Singh 
> wrote:
> > Hi,
> >
> > I have written ETL jobs in Flink (DataSet API). When I execute them in
> IDE,
> > they run and finish fine. When I try to run them on my cluster, I get
> > "Insufficient number of network buffers" error.
> >
> > I have 5 machines in my cluster with 4 cores each. TaskManager is given
> 3GB
> > each. I increased the number of buffers to 5000, but got the same error.
> > When I increased it further (say 7500), I get  exception listed below.
> >
> > The DAG or execution plan is pretty big. What is recommended way to run
> your
> > jobs when the DAG becomes huge? Shall I break it into parts by calling
> > execute on execution environment in between jobs ?
> >
> > Thanks,
> > Tarandeep
> >
> > Exception I got after I tried to run with 7500 buffers:
> >
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> > execution failed.
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> > at
> >
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> > at
> >
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> > at
> >
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
> > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: java.lang.IllegalStateException: Update task on instance
> > d4f3f517b33e5fa8a9932fc06a0aef3b @ dev-cluster-slave1 - 4 slots - URL:
> > akka.tcp://flink@172.22.13.39:52046/user/taskmanager failed due to:
> > at
> >
> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
> > at akka.dispatch.OnFailure.internal(Future.scala:228)
> > at akka.dispatch.OnFailure.internal(Future.scala:227)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
> > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
> > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > at
> >
> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
> > at
> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
> > at
> scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
> > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > at
> >
> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
> > at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > ... 2 more
> > Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> > [Actor[akka.tcp://flink@172.22.13.39:52046/user/taskmanager#-1857397999
> ]]
> > after [1 ms]
> > at
> >
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
> > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
> > at
> >
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> > at
> >
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
> > at
> >
> akka.actor.LightArrayRevolverScheduler$TaskHolder.execut

Re: Scala compilation error

2016-05-03 Thread Srikanth
Yes, I did notice the usage of implicit in ConnectedStreams.scala.
Better Scaladoc will be helpful, especially when compiler errors are not
clear.

Thanks

On Tue, May 3, 2016 at 5:02 AM, Aljoscha Krettek 
wrote:

> There is a Scaladoc but it is not covering all packages, unfortunately. In
> the Scala API you can call transform without specifying a TypeInformation,
> it works using implicits/context bounds.
>
> On Tue, 3 May 2016 at 01:48 Srikanth  wrote:
>
>> Sorry for the previous incomplete email. Didn't realize I hit send!
>>
>> I was facing a weird compilation error in Scala when I did
>> val joinedStream = stream1.connect(stream2)
>> .transform("funName", outTypeInfo, joinOperator)
>>
>> It turned out to be due to a difference in API signature between Scala
>> and Java API. I was refering to javadoc. Is there a scaladoc?
>>
>> Java API has
>> public  SingleOutputStreamOperator transform(
>> String functionName,
>> TypeInformation outTypeInfo,
>> TwoInputStreamOperator operator)
>>
>> Scala API has
>> def transform[R: TypeInformation](
>>   functionName: String,
>>   operator: TwoInputStreamOperator[IN1, IN2, R])
>>
>> Srikanth
>>
>> On Mon, May 2, 2016 at 7:18 PM, Srikanth  wrote:
>>
>>> Hello,
>>>
>>> I'm fac
>>>
>>> val stream = env.addSource(new
>>> FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(),
>>> properties))
>>> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
>>> BidderRawLogs(b)).keyBy(b => b.strategyId)
>>>
>>> val metaStrategy: KeyedStream[(Int, String), Int] =
>>> env.readTextFile("path").name("Strategy")
>>>  .map((1, _) ).keyBy(_._1)
>>>
>>> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
>>> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
>>> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
>>> (Int, BidderRawLogs, (Int, String))] =
>>>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
>>> staticTypeInfo)
>>> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
>>> {}.getTypeInfo()
>>>
>>> val funName = "test"
>>> val joinedStream = bidderStream.connect(metaStrategy)
>>> .transform(funName, joinOperator, outTypeInfo)
>>>
>>>
>>


Re: How to perform Broadcast and groupBy in DataStream like DataSet

2016-05-03 Thread Stefano Baghino
I'm not sure in regards of "withBroadcastSet", but in the DataStream you
"keyBy" instead of "groupBy".

On Tue, May 3, 2016 at 12:35 PM, subash basnet  wrote:

> Hello all,
>
> How could we perform *withBroadcastSet* and *groupBy* in DataStream like
> that of DataSet in the below KMeans code:
>
> DataSet newCentroids = points
> .map(new SelectNearestCenter()).*withBroadcastSet*(loop, "centroids")
> .map(new CountAppender()).*groupBy*(0).reduce(new CentroidAccumulator())
> .map(new CentroidAverager());
>
>
> DataStream newCentroids = points.map(new
> SelectNearestCenter()).???
>
>
> Best Regards,
> Subash Basnet
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Re: Multiple windows with large number of partitions

2016-05-03 Thread Aljoscha Krettek
Yes, please go ahead. That would be helpful.

On Mon, 2 May 2016 at 21:56 Christopher Santiago 
wrote:

> Hi Aljoscha,
>
> Yes, there is still a high partition/window count since I have to keyby
> the userid so that I get unique users.  I believe what I see happening is
> that the second window with the timeWindowAll is not getting all the
> results or the results from the previous window are changing when the
> second window is running.  I can see the date/unique user count increase
> and decrease as it is running for a particular day.
>
> I can share the eclipse project and the sample data file I am working off
> of with you if that would be helpful.
>
> Thanks,
> Chris
>
> On Mon, May 2, 2016 at 12:55 AM, Aljoscha Krettek [via Apache Flink User
> Mailing List archive.] <[hidden email]
> > wrote:
>
>> Hi,
>> what do you mean by "still experiencing the same issues"? Is the key
>> count still very hight, i.e. 500k windows?
>>
>> For the watermark generation, specifying a lag of 2 days is very
>> conservative. If the watermark is this conservative I guess there will
>> never arrive elements that are behind the watermark, thus you wouldn't need
>> the late-element handling in your triggers. The late-element handling in
>> Triggers is only required to compensate for the fact that the watermark can
>> be a heuristic and not always correct.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 28 Apr 2016 at 21:24 Christopher Santiago <[hidden email]
>> > wrote:
>>
>>> Hi Aljoscha,
>>>
>>>
>>> Aljoscha Krettek wrote
>>> >>is there are reason for keying on both the "date only" field and the
>>> "userid". I think you should be fine by just specifying that you want
>>> 1-day
>>> windows on your timestamps.
>>>
>>> My mistake, this was from earlier tests that I had performed.  I removed
>>> it
>>> and went to keyBy(2) and I am still experiencing the same issues.
>>>
>>>
>>> Aljoscha Krettek wrote
>>> >>Also, do you have a timestamp extractor in place that takes the
>>> timestamp
>>> from your data and sets it as the internal timestamp field.
>>>
>>> Yes there is, it is from the BoundedOutOfOrdernessGenerator example:
>>>
>>> public static class BoundedOutOfOrdernessGenerator implements
>>> AssignerWithPeriodicWatermarks> {
>>> private static final long serialVersionUID = 1L;
>>> private final long maxOutOfOrderness =
>>> Time.days(2).toMilliseconds();
>>> private long currentMaxTimestamp;
>>>
>>> @Override
>>> public long extractTimestamp(Tuple3
>>> element, long previousElementTimestamp) {
>>> long timestamp = element.f0.getMillis();
>>> currentMaxTimestamp = Math.max(timestamp,
>>> currentMaxTimestamp);
>>> return timestamp;
>>> }
>>>
>>> @Override
>>> public Watermark getCurrentWatermark() {
>>> return new Watermark(currentMaxTimestamp -
>>> maxOutOfOrderness);
>>> }
>>> }
>>>
>>> Thanks,
>>> Chris
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6562.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6601.html
>> To unsubscribe from Multiple windows with large number of partitions, click
>> here.
>> NAML
>> 
>>
>
>
> --
> View this message in context: Re: Multiple windows with large number of
> partitions
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: How to perform Broadcast and groupBy in DataStream like DataSet

2016-05-03 Thread subash basnet
Hello Stefano,

Thank you, I found out that just sometime ago that I could use keyBy, but I
couldn't find how to set and getBroadcastVariable in datastream like that
of dataset.
For example in below code we get collection of *centroids* via broadcast.

Eg: In KMeans.java
class X extends MapFunctions<>{
  private Collection *centroids*;
  public void open(Configuration parameters) throws Exception {
this.*centroids* = getRuntimeContext().getBroadcastVariable("centroids");
  }
  for (Centroid cent : *centroids*) {
  }
}


Best Regards,
Subash Basnet

On Tue, May 3, 2016 at 4:04 PM, Stefano Baghino <
stefano.bagh...@radicalbit.io> wrote:

> I'm not sure in regards of "withBroadcastSet", but in the DataStream you
> "keyBy" instead of "groupBy".
>
> On Tue, May 3, 2016 at 12:35 PM, subash basnet  wrote:
>
>> Hello all,
>>
>> How could we perform *withBroadcastSet* and *groupBy* in DataStream like
>> that of DataSet in the below KMeans code:
>>
>> DataSet newCentroids = points
>> .map(new SelectNearestCenter()).*withBroadcastSet*(loop, "centroids")
>> .map(new CountAppender()).*groupBy*(0).reduce(new CentroidAccumulator())
>> .map(new CentroidAverager());
>>
>>
>> DataStream newCentroids = points.map(new
>> SelectNearestCenter()).???
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>


how to convert datastream to collection

2016-05-03 Thread subash basnet
Hello all,

Suppose I have the datastream as:
DataStream> *newCentroids*;

How to get collection of *newCentroids * to be able to loop as below:
 private Collection> *centroids*;
 for (Centroid cent : *centroids*) {
  }



Best Regards,
Subash Basnet


Re: how to convert datastream to collection

2016-05-03 Thread Suneel Marthi
DataStream> *newCentroids = new DataStream<>.()*

*Iterator> iter =
DataStreamUtils.collect(newCentroids);*

*List> list = Lists.newArrayList(iter);*

On Tue, May 3, 2016 at 10:26 AM, subash basnet  wrote:

> Hello all,
>
> Suppose I have the datastream as:
> DataStream> *newCentroids*;
>
> How to get collection of *newCentroids * to be able to loop as below:
>  private Collection> *centroids*;
>  for (Centroid cent : *centroids*) {
>   }
>
>
>
> Best Regards,
> Subash Basnet
>


Re: how to convert datastream to collection

2016-05-03 Thread Aljoscha Krettek
Hi,
please keep in mind that we're dealing with streams. The Iterator might
never finish.

Cheers,
Aljoscha

On Tue, 3 May 2016 at 16:35 Suneel Marthi  wrote:

> DataStream> *newCentroids = new DataStream<>.()*
>
> *Iterator> iter =
> DataStreamUtils.collect(newCentroids);*
>
> *List> list = Lists.newArrayList(iter);*
>
> On Tue, May 3, 2016 at 10:26 AM, subash basnet  wrote:
>
>> Hello all,
>>
>> Suppose I have the datastream as:
>> DataStream> *newCentroids*;
>>
>> How to get collection of *newCentroids * to be able to loop as below:
>>  private Collection> *centroids*;
>>  for (Centroid cent : *centroids*) {
>>   }
>>
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>
>


Re: Flink + Kafka + Scalabuff issue

2016-05-03 Thread Alexander Gryzlov
Hello,

Just to follow up on this issue: after collecting some data and setting up
additional tests we have managed to pinpoint the issue to the the
ScalaBuff-generated code that decodes enumerations. After switching to use
ScalaPB generator instead, the problem was gone.

One thing peculiar about this bug, however, is that it seems to manifest
only on Flink. We have a number of ad-hoc streaming pipelines (without
Flink) that are still using the very same decoder code and have been
running for weeks without seemingly experiencing any memory or performance
issues. The versions of Flink that we saw this happening this on are 1.0
and 1.0.1.

Alex

On Tue, Apr 19, 2016 at 1:11 PM, Robert Metzger  wrote:

> Hi Alex,
> I suspect its a GC issue with the code generated by ScalaBuff. Can you
> maybe try to do something like a standalone test where use use a
> while(true) loop to see how fast you can deserialize elements from your Foo
> type?
> Maybe you'll find that the JVM is growing all the time. Then there's
> probably a memory leak somewhere.
>
> On Tue, Apr 19, 2016 at 11:42 AM, Ufuk Celebi  wrote:
>
>> Hey Alex,
>>
>> (1) Which Flink version are you using for this?
>>
>> (2) Can you also get a heap dump after the job slows down? Slow downs
>> like this are often caused by some component leaking memory, maybe in
>> Flink, maybe the Scalabuff deserializer. Can you also share the Foo
>> code?
>>
>> – Ufuk
>>
>> On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov
>>  wrote:
>> > Hello,
>> >
>> > Has anyone tried using ScalaBuff
>> > (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying
>> to
>> > consume Protobuf messages from Kafka 0.8 and have hit a performance
>> issue.
>>
>


Re: how to convert datastream to collection

2016-05-03 Thread Srikanth
Why do you want collect and iterate? Why not iterate on the DataStream
itself?
May be I didn't understand your use case completely.

Srikanth

On Tue, May 3, 2016 at 10:55 AM, Aljoscha Krettek 
wrote:

> Hi,
> please keep in mind that we're dealing with streams. The Iterator might
> never finish.
>
> Cheers,
> Aljoscha
>
> On Tue, 3 May 2016 at 16:35 Suneel Marthi  wrote:
>
>> DataStream> *newCentroids = new DataStream<>.()*
>>
>> *Iterator> iter =
>> DataStreamUtils.collect(newCentroids);*
>>
>> *List> list = Lists.newArrayList(iter);*
>>
>> On Tue, May 3, 2016 at 10:26 AM, subash basnet 
>> wrote:
>>
>>> Hello all,
>>>
>>> Suppose I have the datastream as:
>>> DataStream> *newCentroids*;
>>>
>>> How to get collection of *newCentroids * to be able to loop as below:
>>>  private Collection> *centroids*;
>>>  for (Centroid cent : *centroids*) {
>>>   }
>>>
>>>
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>
>>


Re: How to perform this join operation?

2016-05-03 Thread Elias Levy
Till,

Thanks again for putting this together.  It is certainly along the lines of
what I want to accomplish, but I see some problem with it.  In your code
you use a ValueStore to store the priority queue.  If you are expecting to
store a lot of values in the queue, then you are likely to be using RocksDB
as the state backend.  But if you use a ValueStore for the priority queue
with RocksDB as the backend, the whole priority queue will be deserialized
and serialized each time you add an item to it.  That will become a
crushing cost as the queue grows.

I could instead use a ListState with the RocksDB state, that way only the
single value being added is serialized on an add.  But the get() operation
in the RocksDBListState seems very inefficient, defeating the idea of
working with data sets that don't fit in memory.  It loads all values into
a List instead of returning an Iterable that returns values in the list by
iterating via the RockDB scan API.  Samza has the advantage here, as it
provides a ordered KV state API that allows you to truly iterate over the
values in RocksDB and a caching lager to batch writes into RocksDB. I am
surprised there is no OrderedKeyValueStore API Flink.

Given that only the RocksDB backend can store more state that can fit in
memory and the cost associated with its get() method when keeping track of
a list, it seems like there isn't a good solution keep track of large state
in the form of a list or ordered list in Flink right now.


On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann 
wrote:

> The stream operator would do the following: Collecting the inputs from
> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
> Using a priority queue we order the elements because we don't know how the
> arrive at the operator. Whenever we receive a watermark indicating that no
> earlier events can arrive anymore, we can go through the two priority
> queues to join the elements. The queues are part of the operators state so
> that we don't lose information in case of a recovery.
>
> I've sketched such an operator here [1]. I hope this helps you to get
> started.
>


PLC/Scada/Sensor anomaly detection

2016-05-03 Thread Ivan
Hello!
Has anyone used Flink in "production" for PLC's sanomaly detections?
Any pointers/docs to check?


Best regards,

Iván Venzor C.


s3 checkpointing issue

2016-05-03 Thread Chen Qin
Hi there,

I run a test job with filestatebackend and save checkpoints on s3 (via s3a)

The job crash when checkpoint triggered. Looking into s3 directory and list
objects. I found the directory is create successfully but all checkpoints
directory size are empty.

The host running task manager shows following error.

Received error response: com.amazonaws.services.s3.model.AmazonS3Exception:
Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, AWS
Error Code: null, AWS Error Message: Not Found, S3 Extended Request ID:x

Has anyone met this issue before?

flink 1.0.0
scala 2.10
hadoop-aws 2.7.2
aws-java-sdk 1.7.4


Thanks,
Chen

Attached full log that shows on web dashboard when job canceled.
java.lang.RuntimeException: Error triggering a checkpoint as the result of
receiving checkpoint barrier at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Could
not open output stream for state backend at
org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
at java.io.DataOutputStream.write(DataOutputStream.java:88) at
java.io.DataOutputStream.write(DataOutputStream.java:88) at
org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
at
org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
at
org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
at
org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
... 8 more Caused by: java.lang.NullPointerException at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at
org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
at
org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
... 25 more


reading from latest kafka offset when flink starts

2016-05-03 Thread Balaji Rajagopalan
I am using the flink connector to read from a kafka stream, I ran into the
problem where the flink job went down due to some application error, it was
down for sometime, meanwhile the kafka queue was growing as expected no
consumer to consume from the given group , and when I started the flink it
started consuming the messages no problem so far, but consumer lag was huge
since producer is a fast producer about 4500 events/sec. My question is
there any flink connector configuration which can force it read from the
latest offset when the flink application starts since in my application
logic I do not care about older events.

balaji


Re: Flink - start-cluster.sh

2016-05-03 Thread Balaji Rajagopalan
What is the flink documentation you were following to set up your cluster ,
can you point to that ?

On Tue, May 3, 2016 at 6:21 PM, Punit Naik  wrote:

> Hi
>
> I did all the settings required for cluster setup. but when I ran the
> start-cluster.sh script, it only started one jobmanager on the master node.
> Logs are written only on the master node. Slaves don't have any logs. And
> when I ran a program it said:
>
> Resources available to scheduler: Number of instances=0, total number of
> slots=0, available slots=0
>
> Can anyone help please?
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>