Failed to submit 0.10.1

2016-02-08 Thread Andrew Ge Wu
Hi All

I’m new to flink and come to the step to submit to a remote cluster, and it 
failed with following message:

Association with remote system [akka.tcp://flink@127.0.0.1:61231] has failed, 
address is now gated for [5000] ms. Reason is: [scala.Option; local class 
incompatible: stream classdesc serialVersionUID = -2062608324514658839, local 
class serialVersionUID = -114498752079829388].

I have doubled checked that my client and server version are the same(0.10.1), 
but my java version is a bit different
Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
vs.
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)

Is java the issue or is there anything else i may be missing?


Many thanks


Andrew


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Failed to submit 0.10.1

2016-02-08 Thread Maximilian Michels
Hi Andrew,

It appears that you're using two different versions of the Scala
library in your Flink job. Please make sure you use either 2.10 or
2.11 but not both at the same time.

Best,
Max

On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu  wrote:
> Hi All
>
> I’m new to flink and come to the step to submit to a remote cluster, and it 
> failed with following message:
>
> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has failed, 
> address is now gated for [5000] ms. Reason is: [scala.Option; local class 
> incompatible: stream classdesc serialVersionUID = -2062608324514658839, local 
> class serialVersionUID = -114498752079829388].
>
> I have doubled checked that my client and server version are the 
> same(0.10.1), but my java version is a bit different
> Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
> vs.
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>
> Is java the issue or is there anything else i may be missing?
>
>
> Many thanks
>
>
> Andrew
>
>
> --
> Confidentiality Notice: This e-mail transmission may contain confidential
> or legally privileged information that is intended only for the individual
> or entity named in the e-mail address. If you are not the intended
> recipient, you are hereby notified that any disclosure, copying,
> distribution, or reliance upon the contents of this e-mail is strictly
> prohibited and may be unlawful. If you have received this e-mail in error,
> please notify the sender immediately by return e-mail and delete all copies
> of this message.


Re: Error while reading binary file

2016-02-08 Thread Till Rohrmann
Hi Saliya,

in order to set the file path for the SerializedInputFormat you first have
to create it and then explicitly call setFilePath.

final SerializedInputFormat inputFormat = new
SerializedInputFormat();
inputFormat.setFilePath(PATH_TO_FILE);

env.createInput(inputFormat, myTypeInfo);

Cheers,
Till
​

On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake  wrote:

> Hi,
>
> I was trying to read a simple binary file using SerializedInputFormat as
> suggested in a different thread, but encounters the following error. I
> tried to do what the exception suggests, but eventhough createInput()
> returns a DataSet object I couldn't find how to specify which file to read.
>
> Any help is appreciated. The file I am trying to read is a simple binary
> file with containing java short values. Is there any example on reading
> binary files available?
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The type returned by
> the input format could not be automatically determined. Please specify the
> TypeInformation of the produced type explicitly by using the
> 'createInput(InputFormat, TypeInformation)' method instead.
>
> Thank you,
> Saliya
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>


Re: Error while reading binary file

2016-02-08 Thread Maximilian Michels
Hi Saliya,

Thanks for your question. Flink's type analyzer couldn't extract the
type information. You may implement the ResultTypeQueryable interface
in your custom source. That way you can manually specify the correct
type. If that doesn't help you, could you please share more of the
stack trace?

Thanks,
Max

On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake  wrote:
> Hi,
>
> I was trying to read a simple binary file using SerializedInputFormat as
> suggested in a different thread, but encounters the following error. I tried
> to do what the exception suggests, but eventhough createInput() returns a
> DataSet object I couldn't find how to specify which file to read.
>
> Any help is appreciated. The file I am trying to read is a simple binary
> file with containing java short values. Is there any example on reading
> binary files available?
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The type returned by
> the input format could not be automatically determined. Please specify the
> TypeInformation of the produced type explicitly by using the
> 'createInput(InputFormat, TypeInformation)' method instead.
>
> Thank you,
> Saliya
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org


Re: OutputFormat vs SinkFunction

2016-02-08 Thread Maximilian Michels
Hi Nick,

SinkFunction just implements user-defined functions on incoming
elements. OutputFormat offers more lifecycle methods. Thus it is a
more powerful interface. The OutputFormat originally comes from the
batch API, whereas the SinkFunction originates from streaming. Those
were more separate code paths in the past. Ultimately, it would make
sense to have only the OutputFormat interface but I think we have to
keep it to not break the API.

If you need the lifecycle methods in streaming, there is
RichSinkFunction, which implements OutputFormat and SinkFunction. In
addition, it gives you access to the RuntimeContext. You can pass this
directly to the "addSink(sinkFunction)" API method.

Cheers,
Max

On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk  wrote:
> Heya,
>
> Is there a plan to consolidate these two interfaces? They appear to provide
> identical functionality, differing only in lifecycle management. I found
> myself writing an adaptor so I can consume an OutputFormat where a
> SinkFunction is expected; there's not much to it. This seems like code that
> Flink should ship.
>
> Maybe one interface or the other can be deprecated for 1.0 API?
>
> Thanks,
> Nick


Re: Distribution of sinks among the nodes

2016-02-08 Thread Aljoscha Krettek
Hi,
I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT 
builds you should be able to use:

env.setParallelism(4);

env
.addSource(kafkaSource)
.rescale()
.map(mapper).setParallelism(16);
.rescale()
.addSink(kafkaSink);

to get your desired behavior. For this to work, the parallelism should be set 
to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. 
The source will only be connected to the 4 mappers while the 4 mappers will be 
the only ones connected to the sink.

Cheers,
Aljoscha

> On 04 Feb 2016, at 18:29, Aljoscha Krettek  wrote:
> 
> I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336
> 
> This will implement the data shipping pattern that you mentioned in your 
> initial mail. I also have the Pull request almost ready.
> 
>> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers 
>>  wrote:
>> 
>> Okay ;
>> 
>> Then I guess that the best we can do is to disable chaining (we really want 
>> one thread per operator since they are doing long operations) and have the 
>> same parallelism for sinks as mapping : that way each map will have it’s own 
>> sink and there will be no exchanges between flink instances.
>> 
>> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of 
>> Stephan Ewen
>> Sent: jeudi 4 février 2016 15:13
>> To: user@flink.apache.org
>> Subject: Re: Distribution of sinks among the nodes
>> 
>> To your other question, there are two things in Flink:
>> 
>> (1) Chaining. Tasks are folded together into one task, run by one thread.
>> 
>> (2) Resource groups: Tasks stay separate, have separate threads, but share a 
>> slot (which means share memory resources). See the link in my previous mail 
>> for an explanation concerning those.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen  wrote:
>> Hi Gwen!
>> 
>> You actually need not 24 slots, but only as many as the highest parallelism 
>> is (16). Slots do not hold individual tasks, but "pipelines". 
>> 
>> Here is an illustration how that works.
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
>> 
>> You can control whether a task can share the slot with the previous task 
>> with the function "startNewResourceGroup()" in the streaming API. Sharing 
>> lots makes a few things easier to reason about, especially when adding 
>> operators to a program, you need not immediately add new machines.
>> 
>> 
>> How to solve your program case
>> 
>> 
>> We can actually make a pretty simple addition to Flink that will cause the 
>> tasks to be locally connected, which in turn will cause the scheduler to 
>> distribute them like you intend.
>> Rather than let the 4 sources rebalance across all 16 mappers, each one 
>> should redistribute to 4 local mappers, and these 4 mappers should send data 
>> to one local sink each.
>> 
>> We'll try and add that today and ping you once it is in.
>> 
>> The following would be sample code to use this:
>> 
>> env.setParallelism(4);
>> 
>> env
>>.addSource(kafkaSource)
>>.partitionFan()
>>.map(mapper).setParallelism(16);
>>.partitionFan()
>>.addSink(kafkaSink);
>> 
>> 
>> 
>> A bit of background why the mechanism is the way that it is right now
>> --
>> 
>> You can think of a slot as a slice of resources. In particular, an amount of 
>> memory from the memory manager, but also memory in the network stack.
>> 
>> What we want to do quite soon is to make streaming programs more elastic. 
>> Consider for example the case that you have 16 slots on 4 machines, a 
>> machine fails, and you have no spare resources. In that case Flink should 
>> recognize that no spare resource can be acquired, and scale the job in. 
>> Since you have only 12 slots left, the parallelism of the mappers is reduced 
>> to 12, and the source task that was on the failed machine is moved to a slot 
>> on another machine.
>> 
>> It is important that the guaranteed resources for each task do not change 
>> when scaling in, to keep behavior predictable. In this case, each slot will 
>> still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots 
>> before. That is also the reason why the slots are per TaskManager, and not 
>> global, to associate them with a constant set of resources (mainly memory).
>> 
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers 
>>  wrote:
>> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 
>> sinks) ?
>> 
>> Or is there a way not to set the number of slots per TaskManager instead of 
>> globally so that they are at least equally dispatched among the nodes ?
>> 
>> As for the sink deployment : that’s not good news ; I mean we will have a 
>> non-negligible overhead : all the data gene

Re: Failed to submit 0.10.1

2016-02-08 Thread Andrew Ge Wu
Thanks Max

My local and remote environment are running: Scala code runner version 2.11.7 
-- Copyright 2002-2013, LAMP/EPFL
And I downloaded binary 
2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz),
Is there a different version of client lib for scala 2.11?


Best, 

Andrew

> On 08 Feb 2016, at 11:30, Maximilian Michels  wrote:
> 
> Hi Andrew,
> 
> It appears that you're using two different versions of the Scala
> library in your Flink job. Please make sure you use either 2.10 or
> 2.11 but not both at the same time.
> 
> Best,
> Max
> 
> On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu  wrote:
>> Hi All
>> 
>> I’m new to flink and come to the step to submit to a remote cluster, and it 
>> failed with following message:
>> 
>> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has 
>> failed, address is now gated for [5000] ms. Reason is: [scala.Option; local 
>> class incompatible: stream classdesc serialVersionUID = 
>> -2062608324514658839, local class serialVersionUID = -114498752079829388].
>> 
>> I have doubled checked that my client and server version are the 
>> same(0.10.1), but my java version is a bit different
>> Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
>> vs.
>> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>> 
>> Is java the issue or is there anything else i may be missing?
>> 
>> 
>> Many thanks
>> 
>> 
>> Andrew
>> 
>> 
>> --
>> Confidentiality Notice: This e-mail transmission may contain confidential
>> or legally privileged information that is intended only for the individual
>> or entity named in the e-mail address. If you are not the intended
>> recipient, you are hereby notified that any disclosure, copying,
>> distribution, or reliance upon the contents of this e-mail is strictly
>> prohibited and may be unlawful. If you have received this e-mail in error,
>> please notify the sender immediately by return e-mail and delete all copies
>> of this message.


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Failed to submit 0.10.1

2016-02-08 Thread Andrew Ge Wu
Yes, found a special dependency for 2.11, Thanks!



org.apache.flink
flink-streaming-java_2.11
${apache.flink.versin}


Andrew

> On 08 Feb 2016, at 14:18, Andrew Ge Wu  wrote:
> 
> Thanks Max
> 
> My local and remote environment are running: Scala code runner version 2.11.7 
> -- Copyright 2002-2013, LAMP/EPFL
> And I downloaded binary 
> 2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz),
> Is there a different version of client lib for scala 2.11?
> 
> 
> Best, 
> 
> Andrew
> 
>> On 08 Feb 2016, at 11:30, Maximilian Michels  wrote:
>> 
>> Hi Andrew,
>> 
>> It appears that you're using two different versions of the Scala
>> library in your Flink job. Please make sure you use either 2.10 or
>> 2.11 but not both at the same time.
>> 
>> Best,
>> Max
>> 
>> On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu  wrote:
>>> Hi All
>>> 
>>> I’m new to flink and come to the step to submit to a remote cluster, and it 
>>> failed with following message:
>>> 
>>> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has 
>>> failed, address is now gated for [5000] ms. Reason is: [scala.Option; local 
>>> class incompatible: stream classdesc serialVersionUID = 
>>> -2062608324514658839, local class serialVersionUID = -114498752079829388].
>>> 
>>> I have doubled checked that my client and server version are the 
>>> same(0.10.1), but my java version is a bit different
>>> Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
>>> vs.
>>> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>>> 
>>> Is java the issue or is there anything else i may be missing?
>>> 
>>> 
>>> Many thanks
>>> 
>>> 
>>> Andrew
>>> 
>>> 
>>> --
>>> Confidentiality Notice: This e-mail transmission may contain confidential
>>> or legally privileged information that is intended only for the individual
>>> or entity named in the e-mail address. If you are not the intended
>>> recipient, you are hereby notified that any disclosure, copying,
>>> distribution, or reliance upon the contents of this e-mail is strictly
>>> prohibited and may be unlawful. If you have received this e-mail in error,
>>> please notify the sender immediately by return e-mail and delete all copies
>>> of this message.
> 


-- 
Confidentiality Notice: This e-mail transmission may contain confidential 
or legally privileged information that is intended only for the individual 
or entity named in the e-mail address. If you are not the intended 
recipient, you are hereby notified that any disclosure, copying, 
distribution, or reliance upon the contents of this e-mail is strictly 
prohibited and may be unlawful. If you have received this e-mail in error, 
please notify the sender immediately by return e-mail and delete all copies 
of this message.


Re: Failed to submit 0.10.1

2016-02-08 Thread Maximilian Michels
You're welcome. As of recent changes, all Maven artifact names are now
suffixed with the Scala major version. However, the old artifacts are
still available for the snapshot version. I've just pushed an empty
flink-streaming-java for 1.0-SNAPSHOT to prevent users from compiling
code which would fail later on.

Cheers,
Max



On Mon, Feb 8, 2016 at 2:32 PM, Andrew Ge Wu  wrote:
> Yes, found a special dependency for 2.11, Thanks!
>
>
> 
> org.apache.flink
> flink-streaming-java_2.11
> ${apache.flink.versin}
> 
>
>
> Andrew
>
> On 08 Feb 2016, at 14:18, Andrew Ge Wu  wrote:
>
> Thanks Max
>
> My local and remote environment are running: Scala code runner version
> 2.11.7 -- Copyright 2002-2013, LAMP/EPFL
> And I downloaded binary
> 2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz),
> Is there a different version of client lib for scala 2.11?
>
>
> Best,
>
> Andrew
>
> On 08 Feb 2016, at 11:30, Maximilian Michels  wrote:
>
> Hi Andrew,
>
> It appears that you're using two different versions of the Scala
> library in your Flink job. Please make sure you use either 2.10 or
> 2.11 but not both at the same time.
>
> Best,
> Max
>
> On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu 
> wrote:
>
> Hi All
>
> I’m new to flink and come to the step to submit to a remote cluster, and it
> failed with following message:
>
> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has
> failed, address is now gated for [5000] ms. Reason is: [scala.Option; local
> class incompatible: stream classdesc serialVersionUID =
> -2062608324514658839, local class serialVersionUID = -114498752079829388].
>
> I have doubled checked that my client and server version are the
> same(0.10.1), but my java version is a bit different
> Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
> vs.
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
>
> Is java the issue or is there anything else i may be missing?
>
>
> Many thanks
>
>
> Andrew
>
>
> --
> Confidentiality Notice: This e-mail transmission may contain confidential
> or legally privileged information that is intended only for the individual
> or entity named in the e-mail address. If you are not the intended
> recipient, you are hereby notified that any disclosure, copying,
> distribution, or reliance upon the contents of this e-mail is strictly
> prohibited and may be unlawful. If you have received this e-mail in error,
> please notify the sender immediately by return e-mail and delete all copies
> of this message.
>
>
>
>
> Confidentiality Notice: This e-mail transmission may contain confidential or
> legally privileged information that is intended only for the individual or
> entity named in the e-mail address. If you are not the intended recipient,
> you are hereby notified that any disclosure, copying, distribution, or
> reliance upon the contents of this e-mail is strictly prohibited and may be
> unlawful. If you have received this e-mail in error, please notify the
> sender immediately by return e-mail and delete all copies of this message.


Re: Error while reading binary file

2016-02-08 Thread Saliya Ekanayake
Thank you Till and Max. I'll try the set file path method and let you know.
On Feb 8, 2016 5:45 AM, "Maximilian Michels"  wrote:

> Hi Saliya,
>
> Thanks for your question. Flink's type analyzer couldn't extract the
> type information. You may implement the ResultTypeQueryable interface
> in your custom source. That way you can manually specify the correct
> type. If that doesn't help you, could you please share more of the
> stack trace?
>
> Thanks,
> Max
>
> On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake 
> wrote:
> > Hi,
> >
> > I was trying to read a simple binary file using SerializedInputFormat as
> > suggested in a different thread, but encounters the following error. I
> tried
> > to do what the exception suggests, but eventhough createInput() returns a
> > DataSet object I couldn't find how to specify which file to read.
> >
> > Any help is appreciated. The file I am trying to read is a simple binary
> > file with containing java short values. Is there any example on reading
> > binary files available?
> >
> > Exception in thread "main"
> > org.apache.flink.api.common.InvalidProgramException: The type returned by
> > the input format could not be automatically determined. Please specify
> the
> > TypeInformation of the produced type explicitly by using the
> > 'createInput(InputFormat, TypeInformation)' method instead.
> >
> > Thank you,
> > Saliya
> >
> >
> > --
> > Saliya Ekanayake
> > Ph.D. Candidate | Research Assistant
> > School of Informatics and Computing | Digital Science Center
> > Indiana University, Bloomington
> > Cell 812-391-4914
> > http://saliya.org
>


Re: Flink on YARN: Stuck on "Trying to register at JobManager"

2016-02-08 Thread Robert Metzger
You said earlier that you are using Flink 0.10. The feature is only
available in 1.0-SNAPSHOT.

On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete  wrote:

> Ive tried setting the yarn.application-master.port property in
> flink-conf.yaml to a range suggested in
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi
> rewalls
>
> The JobManager does not seem to be picking the property up. Am I setting
> this in the wrong place? Or is there another way to enforce this property?
>
> Cheers,
>
> Pieter
>
> 2016-02-07 20:04 GMT+01:00 Pieter Hameete :
>
>> I found the relevant information on the website. Ill consult with the
>> cluster admin tomorrow, thanks for the help :-)
>>
>> - Pieter
>>
>> 2016-02-07 19:31 GMT+01:00 Robert Metzger :
>>
>>> Hi,
>>>
>>> we had other users with a similar issue as well. There is a
>>> configuration value which allows you to specify a single port or a range of
>>> ports for the JobManager to allocate when running on YARN.
>>> Note that when using this with a single port, the JMs may collide.
>>>
>>>
>>>
>>> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete 
>>> wrote:
>>>
 Hi Stephan,

 surely it seems this way! I must not be the first with this issue
 though? I'll have to contact the cluster admins to find a solution
 together. What would be a way of make the JobManagers accessible from
 outside the network, because the IP and port number changes every time.

 Alternatively, I can ask for ssh access to a node within the network.
 that will surely work but it's not my preferred solution.

 - Pieter

 2016-02-06 16:22 GMT+01:00 Stephan Ewen :

> Yeah, sounds a lot like the client cannot connect to the JobManager
> port.
>
> The ports to communicate with HDFS and the YARN resource manager may
> be whitelisted r forwarded, so you can submit the YARN session, but then
> not connect to the JobManager afterwards.
>
>
>
> On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete 
> wrote:
>
>> Hi Max!
>>
>> I'm using Flink 0.10.1 and indeed the cluster seems to be created
>> fine, all in the JobManager Web UI looks good.
>>
>> It seems like the JobManager initiates the connection with my VM and
>> cannot reach it. It could be that this is similar to the problem here:
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html
>>
>> I probably have to make some changes to the networking configuration
>> of my VM so it can be reached by the JobManager despite using a different
>> port each time.
>>
>> - Pieter
>>
>> 2016-02-06 14:05 GMT+01:00 Maximilian Michels :
>>
>>> Hi Pieter,
>>>
>>> Which version of Flink are you using? It appears you've created a
>>> Flink YARN cluster but you can't reach the JobManager afterwards.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete 
>>> wrote:
>>> > Hi Robert,
>>> >
>>> > unfortunately there are no signs of what is going wrong in the
>>> logs. The
>>> > last log messages are about succesful registration of the
>>> TaskManagers.
>>> >
>>> > I'm also fairly sure it must be something in my VM that is causing
>>> this,
>>> > because when I start the yarn-session from a login node that is on
>>> the same
>>> > network as the hadoop cluster there are no problems registering
>>> with the
>>> > JobManager. I did also notice the following message in the local
>>> console:
>>> >
>>> > 12:30:27,173 WARN  Remoting
>>> > - Tried to associate with unreachable remote address
>>> > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated for
>>> 5000 ms,
>>> > all messages to this address will be delivered to dead letters.
>>> Reason:
>>> > connection timed out: /145.100.41.13:41539
>>> >
>>> > I can ping the JobManager fine from with VM. Could there be some
>>> invalid or
>>> > missing configuration on my side?
>>> >
>>> > Cheers,
>>> >
>>> > Pieter
>>> >
>>> >
>>> > 2016-02-06 12:54 GMT+01:00 Robert Metzger :
>>> >>
>>> >> Hi,
>>> >>
>>> >> did you check the logs of the JobManager itself? Maybe it'll tell
>>> us
>>> >> already whats going on.
>>> >>
>>> >> On Sat, Feb 6, 2016 at 12:14 PM, Pieter Hameete <
>>> phame...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>> Hi Guys!
>>> >>>
>>> >>> Im attempting to run Flink on YARN, but I run into an issue. Im
>>> starting
>>> >>> the Flink YARN session from an Ubuntu 14.04 VM. All goes well
>>> until after
>>> >>> the JobManager web UI is started:
>>> >>>
>>> >>> JobManager web interface address
>>> >>>
>>> http://head05.hathi.surfsara.nl:8088/proxy/application_145278

Re: Flink on YARN: Stuck on "Trying to register at JobManager"

2016-02-08 Thread Pieter Hameete
Matter of RTFM eh ;-) thx and sorry for the bother.

2016-02-08 17:06 GMT+01:00 Robert Metzger :

> You said earlier that you are using Flink 0.10. The feature is only
> available in 1.0-SNAPSHOT.
>
> On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete  wrote:
>
>> Ive tried setting the yarn.application-master.port property in
>> flink-conf.yaml to a range suggested in
>> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi
>> rewalls
>>
>> The JobManager does not seem to be picking the property up. Am I setting
>> this in the wrong place? Or is there another way to enforce this property?
>>
>> Cheers,
>>
>> Pieter
>>
>> 2016-02-07 20:04 GMT+01:00 Pieter Hameete :
>>
>>> I found the relevant information on the website. Ill consult with the
>>> cluster admin tomorrow, thanks for the help :-)
>>>
>>> - Pieter
>>>
>>> 2016-02-07 19:31 GMT+01:00 Robert Metzger :
>>>
 Hi,

 we had other users with a similar issue as well. There is a
 configuration value which allows you to specify a single port or a range of
 ports for the JobManager to allocate when running on YARN.
 Note that when using this with a single port, the JMs may collide.



 On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete 
 wrote:

> Hi Stephan,
>
> surely it seems this way! I must not be the first with this issue
> though? I'll have to contact the cluster admins to find a solution
> together. What would be a way of make the JobManagers accessible from
> outside the network, because the IP and port number changes every time.
>
> Alternatively, I can ask for ssh access to a node within the network.
> that will surely work but it's not my preferred solution.
>
> - Pieter
>
> 2016-02-06 16:22 GMT+01:00 Stephan Ewen :
>
>> Yeah, sounds a lot like the client cannot connect to the JobManager
>> port.
>>
>> The ports to communicate with HDFS and the YARN resource manager may
>> be whitelisted r forwarded, so you can submit the YARN session, but then
>> not connect to the JobManager afterwards.
>>
>>
>>
>> On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete 
>> wrote:
>>
>>> Hi Max!
>>>
>>> I'm using Flink 0.10.1 and indeed the cluster seems to be created
>>> fine, all in the JobManager Web UI looks good.
>>>
>>> It seems like the JobManager initiates the connection with my VM and
>>> cannot reach it. It could be that this is similar to the problem here:
>>>
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html
>>>
>>> I probably have to make some changes to the networking configuration
>>> of my VM so it can be reached by the JobManager despite using a 
>>> different
>>> port each time.
>>>
>>> - Pieter
>>>
>>> 2016-02-06 14:05 GMT+01:00 Maximilian Michels :
>>>
 Hi Pieter,

 Which version of Flink are you using? It appears you've created a
 Flink YARN cluster but you can't reach the JobManager afterwards.

 Cheers,
 Max

 On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete 
 wrote:
 > Hi Robert,
 >
 > unfortunately there are no signs of what is going wrong in the
 logs. The
 > last log messages are about succesful registration of the
 TaskManagers.
 >
 > I'm also fairly sure it must be something in my VM that is
 causing this,
 > because when I start the yarn-session from a login node that is
 on the same
 > network as the hadoop cluster there are no problems registering
 with the
 > JobManager. I did also notice the following message in the local
 console:
 >
 > 12:30:27,173 WARN  Remoting
 > - Tried to associate with unreachable remote address
 > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated for
 5000 ms,
 > all messages to this address will be delivered to dead letters.
 Reason:
 > connection timed out: /145.100.41.13:41539
 >
 > I can ping the JobManager fine from with VM. Could there be some
 invalid or
 > missing configuration on my side?
 >
 > Cheers,
 >
 > Pieter
 >
 >
 > 2016-02-06 12:54 GMT+01:00 Robert Metzger :
 >>
 >> Hi,
 >>
 >> did you check the logs of the JobManager itself? Maybe it'll
 tell us
 >> already whats going on.
 >>
 >> On Sat, Feb 6, 2016 at 12:14 PM, Pieter Hameete <
 phame...@gmail.com>
 >> wrote:
 >>>
 >>> Hi Guys!
 >>>
 >>> Im attempting to run Flink on YARN, but I run into an issue. Im
 starting
 >>> the Flink YAR

Re: Flink on YARN: Stuck on "Trying to register at JobManager"

2016-02-08 Thread Pieter Hameete
Ive tried setting the yarn.application-master.port property in
flink-conf.yaml to a range suggested in
https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi
rewalls

The JobManager does not seem to be picking the property up. Am I setting
this in the wrong place? Or is there another way to enforce this property?

Cheers,

Pieter

2016-02-07 20:04 GMT+01:00 Pieter Hameete :

> I found the relevant information on the website. Ill consult with the
> cluster admin tomorrow, thanks for the help :-)
>
> - Pieter
>
> 2016-02-07 19:31 GMT+01:00 Robert Metzger :
>
>> Hi,
>>
>> we had other users with a similar issue as well. There is a configuration
>> value which allows you to specify a single port or a range of ports for the
>> JobManager to allocate when running on YARN.
>> Note that when using this with a single port, the JMs may collide.
>>
>>
>>
>> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete 
>> wrote:
>>
>>> Hi Stephan,
>>>
>>> surely it seems this way! I must not be the first with this issue
>>> though? I'll have to contact the cluster admins to find a solution
>>> together. What would be a way of make the JobManagers accessible from
>>> outside the network, because the IP and port number changes every time.
>>>
>>> Alternatively, I can ask for ssh access to a node within the network.
>>> that will surely work but it's not my preferred solution.
>>>
>>> - Pieter
>>>
>>> 2016-02-06 16:22 GMT+01:00 Stephan Ewen :
>>>
 Yeah, sounds a lot like the client cannot connect to the JobManager
 port.

 The ports to communicate with HDFS and the YARN resource manager may be
 whitelisted r forwarded, so you can submit the YARN session, but then not
 connect to the JobManager afterwards.



 On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete 
 wrote:

> Hi Max!
>
> I'm using Flink 0.10.1 and indeed the cluster seems to be created
> fine, all in the JobManager Web UI looks good.
>
> It seems like the JobManager initiates the connection with my VM and
> cannot reach it. It could be that this is similar to the problem here:
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html
>
> I probably have to make some changes to the networking configuration
> of my VM so it can be reached by the JobManager despite using a different
> port each time.
>
> - Pieter
>
> 2016-02-06 14:05 GMT+01:00 Maximilian Michels :
>
>> Hi Pieter,
>>
>> Which version of Flink are you using? It appears you've created a
>> Flink YARN cluster but you can't reach the JobManager afterwards.
>>
>> Cheers,
>> Max
>>
>> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete 
>> wrote:
>> > Hi Robert,
>> >
>> > unfortunately there are no signs of what is going wrong in the
>> logs. The
>> > last log messages are about succesful registration of the
>> TaskManagers.
>> >
>> > I'm also fairly sure it must be something in my VM that is causing
>> this,
>> > because when I start the yarn-session from a login node that is on
>> the same
>> > network as the hadoop cluster there are no problems registering
>> with the
>> > JobManager. I did also notice the following message in the local
>> console:
>> >
>> > 12:30:27,173 WARN  Remoting
>> > - Tried to associate with unreachable remote address
>> > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated for
>> 5000 ms,
>> > all messages to this address will be delivered to dead letters.
>> Reason:
>> > connection timed out: /145.100.41.13:41539
>> >
>> > I can ping the JobManager fine from with VM. Could there be some
>> invalid or
>> > missing configuration on my side?
>> >
>> > Cheers,
>> >
>> > Pieter
>> >
>> >
>> > 2016-02-06 12:54 GMT+01:00 Robert Metzger :
>> >>
>> >> Hi,
>> >>
>> >> did you check the logs of the JobManager itself? Maybe it'll tell
>> us
>> >> already whats going on.
>> >>
>> >> On Sat, Feb 6, 2016 at 12:14 PM, Pieter Hameete <
>> phame...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi Guys!
>> >>>
>> >>> Im attempting to run Flink on YARN, but I run into an issue. Im
>> starting
>> >>> the Flink YARN session from an Ubuntu 14.04 VM. All goes well
>> until after
>> >>> the JobManager web UI is started:
>> >>>
>> >>> JobManager web interface address
>> >>>
>> http://head05.hathi.surfsara.nl:8088/proxy/application_1452780322684_10532/
>> >>> Waiting until all TaskManagers have connected
>> >>> 11:09:51,557 INFO  org.apache.flink.yarn.ApplicationClient
>> >>> - Notification about new leader address
>> >>> akka.tcp://flink@145.100.41.148:35666/user/jobmanager with
>> session ID null.
>> >>> No stat

Re: Flink on YARN: Stuck on "Trying to register at JobManager"

2016-02-08 Thread Pieter Hameete
After downloading and building the 1.0-SNAPSHOT from the master branch I do
run into another problem when starting a YARN cluster. The startup now
infinitely loops at the following step:

17:39:12,369 INFO
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm2
17:39:34,855 INFO
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
over to rm1

Any clue what couldve gone wrong? I used all-default for building with
maven.

- Pieter



2016-02-08 17:07 GMT+01:00 Pieter Hameete :

> Matter of RTFM eh ;-) thx and sorry for the bother.
>
> 2016-02-08 17:06 GMT+01:00 Robert Metzger :
>
>> You said earlier that you are using Flink 0.10. The feature is only
>> available in 1.0-SNAPSHOT.
>>
>> On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete 
>> wrote:
>>
>>> Ive tried setting the yarn.application-master.port property in
>>> flink-conf.yaml to a range suggested in
>>> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi
>>> rewalls
>>>
>>> The JobManager does not seem to be picking the property up. Am I setting
>>> this in the wrong place? Or is there another way to enforce this property?
>>>
>>> Cheers,
>>>
>>> Pieter
>>>
>>> 2016-02-07 20:04 GMT+01:00 Pieter Hameete :
>>>
 I found the relevant information on the website. Ill consult with the
 cluster admin tomorrow, thanks for the help :-)

 - Pieter

 2016-02-07 19:31 GMT+01:00 Robert Metzger :

> Hi,
>
> we had other users with a similar issue as well. There is a
> configuration value which allows you to specify a single port or a range 
> of
> ports for the JobManager to allocate when running on YARN.
> Note that when using this with a single port, the JMs may collide.
>
>
>
> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete 
> wrote:
>
>> Hi Stephan,
>>
>> surely it seems this way! I must not be the first with this issue
>> though? I'll have to contact the cluster admins to find a solution
>> together. What would be a way of make the JobManagers accessible from
>> outside the network, because the IP and port number changes every time.
>>
>> Alternatively, I can ask for ssh access to a node within the network.
>> that will surely work but it's not my preferred solution.
>>
>> - Pieter
>>
>> 2016-02-06 16:22 GMT+01:00 Stephan Ewen :
>>
>>> Yeah, sounds a lot like the client cannot connect to the JobManager
>>> port.
>>>
>>> The ports to communicate with HDFS and the YARN resource manager may
>>> be whitelisted r forwarded, so you can submit the YARN session, but then
>>> not connect to the JobManager afterwards.
>>>
>>>
>>>
>>> On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete 
>>> wrote:
>>>
 Hi Max!

 I'm using Flink 0.10.1 and indeed the cluster seems to be created
 fine, all in the JobManager Web UI looks good.

 It seems like the JobManager initiates the connection with my VM
 and cannot reach it. It could be that this is similar to the problem 
 here:


 http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html

 I probably have to make some changes to the networking
 configuration of my VM so it can be reached by the JobManager despite 
 using
 a different port each time.

 - Pieter

 2016-02-06 14:05 GMT+01:00 Maximilian Michels :

> Hi Pieter,
>
> Which version of Flink are you using? It appears you've created a
> Flink YARN cluster but you can't reach the JobManager afterwards.
>
> Cheers,
> Max
>
> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete 
> wrote:
> > Hi Robert,
> >
> > unfortunately there are no signs of what is going wrong in the
> logs. The
> > last log messages are about succesful registration of the
> TaskManagers.
> >
> > I'm also fairly sure it must be something in my VM that is
> causing this,
> > because when I start the yarn-session from a login node that is
> on the same
> > network as the hadoop cluster there are no problems registering
> with the
> > JobManager. I did also notice the following message in the local
> console:
> >
> > 12:30:27,173 WARN  Remoting
> > - Tried to associate with unreachable remote address
> > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated
> for 5000 ms,
> > all messages to this address will be delivered to dead letters.
> Reason:
> > connection timed out: /145.100.41.13:41539
> >
> > I can ping the JobManager fine from with VM

Re: OutputFormat vs SinkFunction

2016-02-08 Thread Nick Dimiduk
In my case, I have my application code that is calling addSink, for which
I'm writing a test that needs to use LocalCollectionOutputFormat. Having
two separate class hierarchies is not helpful, hence the adapter. Much of
this code already exists in the implementation of FileSinkFunction, so the
project already supports it in a limited way.

On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels  wrote:

> Hi Nick,
>
> SinkFunction just implements user-defined functions on incoming
> elements. OutputFormat offers more lifecycle methods. Thus it is a
> more powerful interface. The OutputFormat originally comes from the
> batch API, whereas the SinkFunction originates from streaming. Those
> were more separate code paths in the past. Ultimately, it would make
> sense to have only the OutputFormat interface but I think we have to
> keep it to not break the API.
>
> If you need the lifecycle methods in streaming, there is
> RichSinkFunction, which implements OutputFormat and SinkFunction. In
> addition, it gives you access to the RuntimeContext. You can pass this
> directly to the "addSink(sinkFunction)" API method.
>
> Cheers,
> Max
>
> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk  wrote:
> > Heya,
> >
> > Is there a plan to consolidate these two interfaces? They appear to
> provide
> > identical functionality, differing only in lifecycle management. I found
> > myself writing an adaptor so I can consume an OutputFormat where a
> > SinkFunction is expected; there's not much to it. This seems like code
> that
> > Flink should ship.
> >
> > Maybe one interface or the other can be deprecated for 1.0 API?
> >
> > Thanks,
> > Nick
>


Re: OutputFormat vs SinkFunction

2016-02-08 Thread Maximilian Michels
Changing the class hierarchy would break backwards-compatibility of the
API. However, we could add another method to DataStream to easily use
OutputFormats in streaming.

How did you write your adapter? I came up with the one below. Admittedly,
it is sort of a hack but works fine. By the way, you can also use the
DataStream.write(OutputFormat format) method to use any OutputFormat. The
code is below is just if you really only want to use
DataStream.addSink(SinkFunction function).

Cheers,
Max

import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.Collection;

public class OutputFormatAdapter extends LocalCollectionOutputFormat
   implements SinkFunction, RichFunction {

   public OutputFormatAdapter(Collection out) {
  super(out);
   }

   @Override
   public void invoke(T value) throws Exception {
  super.writeRecord(value);
   }

   @Override
   public void open(Configuration parameters) throws Exception {
  super.open(getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
   }

   @Override
   public IterationRuntimeContext getIterationRuntimeContext() {
  throw new UnsupportedOperationException("This is not supported.");
   }


   /** Small test */
   public static void main(String[] args) throws Exception {

  StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

  final DataStreamSource longDataStreamSource =
env.generateSequence(0, 1000);

  final ArrayList longs = new ArrayList<>();

  longDataStreamSource.addSink(new OutputFormatAdapter<>(longs));

  env.execute();

  for (long l : longs) {
 System.out.println(l);
  }
   }
}



On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk  wrote:

> In my case, I have my application code that is calling addSink, for which
> I'm writing a test that needs to use LocalCollectionOutputFormat. Having
> two separate class hierarchies is not helpful, hence the adapter. Much of
> this code already exists in the implementation of FileSinkFunction, so the
> project already supports it in a limited way.
>
> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels  wrote:
>
>> Hi Nick,
>>
>> SinkFunction just implements user-defined functions on incoming
>> elements. OutputFormat offers more lifecycle methods. Thus it is a
>> more powerful interface. The OutputFormat originally comes from the
>> batch API, whereas the SinkFunction originates from streaming. Those
>> were more separate code paths in the past. Ultimately, it would make
>> sense to have only the OutputFormat interface but I think we have to
>> keep it to not break the API.
>>
>> If you need the lifecycle methods in streaming, there is
>> RichSinkFunction, which implements OutputFormat and SinkFunction. In
>> addition, it gives you access to the RuntimeContext. You can pass this
>> directly to the "addSink(sinkFunction)" API method.
>>
>> Cheers,
>> Max
>>
>> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk  wrote:
>> > Heya,
>> >
>> > Is there a plan to consolidate these two interfaces? They appear to
>> provide
>> > identical functionality, differing only in lifecycle management. I found
>> > myself writing an adaptor so I can consume an OutputFormat where a
>> > SinkFunction is expected; there's not much to it. This seems like code
>> that
>> > Flink should ship.
>> >
>> > Maybe one interface or the other can be deprecated for 1.0 API?
>> >
>> > Thanks,
>> > Nick
>>
>
>


Re: Flink on YARN: Stuck on "Trying to register at JobManager"

2016-02-08 Thread Robert Metzger
Mh, that's weird. Maybe both resource managers are marked as "standby"? Not
sure what can cause this issue.

Which YARN version are you using? Maybe you need to build Flink against
that specific hadoop version yourself.

On Mon, Feb 8, 2016 at 5:50 PM, Pieter Hameete  wrote:

> After downloading and building the 1.0-SNAPSHOT from the master branch I
> do run into another problem when starting a YARN cluster. The startup now
> infinitely loops at the following step:
>
> 17:39:12,369 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
> over to rm2
> 17:39:34,855 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
> over to rm1
>
> Any clue what couldve gone wrong? I used all-default for building with
> maven.
>
> - Pieter
>
>
>
> 2016-02-08 17:07 GMT+01:00 Pieter Hameete :
>
>> Matter of RTFM eh ;-) thx and sorry for the bother.
>>
>> 2016-02-08 17:06 GMT+01:00 Robert Metzger :
>>
>>> You said earlier that you are using Flink 0.10. The feature is only
>>> available in 1.0-SNAPSHOT.
>>>
>>> On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete 
>>> wrote:
>>>
 Ive tried setting the yarn.application-master.port property in
 flink-conf.yaml to a range suggested in
 https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi
 rewalls

 The JobManager does not seem to be picking the property up. Am I
 setting this in the wrong place? Or is there another way to enforce this
 property?

 Cheers,

 Pieter

 2016-02-07 20:04 GMT+01:00 Pieter Hameete :

> I found the relevant information on the website. Ill consult with the
> cluster admin tomorrow, thanks for the help :-)
>
> - Pieter
>
> 2016-02-07 19:31 GMT+01:00 Robert Metzger :
>
>> Hi,
>>
>> we had other users with a similar issue as well. There is a
>> configuration value which allows you to specify a single port or a range 
>> of
>> ports for the JobManager to allocate when running on YARN.
>> Note that when using this with a single port, the JMs may collide.
>>
>>
>>
>> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete 
>> wrote:
>>
>>> Hi Stephan,
>>>
>>> surely it seems this way! I must not be the first with this issue
>>> though? I'll have to contact the cluster admins to find a solution
>>> together. What would be a way of make the JobManagers accessible from
>>> outside the network, because the IP and port number changes every time.
>>>
>>> Alternatively, I can ask for ssh access to a node within the
>>> network. that will surely work but it's not my preferred solution.
>>>
>>> - Pieter
>>>
>>> 2016-02-06 16:22 GMT+01:00 Stephan Ewen :
>>>
 Yeah, sounds a lot like the client cannot connect to the JobManager
 port.

 The ports to communicate with HDFS and the YARN resource manager
 may be whitelisted r forwarded, so you can submit the YARN session, but
 then not connect to the JobManager afterwards.



 On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete 
 wrote:

> Hi Max!
>
> I'm using Flink 0.10.1 and indeed the cluster seems to be created
> fine, all in the JobManager Web UI looks good.
>
> It seems like the JobManager initiates the connection with my VM
> and cannot reach it. It could be that this is similar to the problem 
> here:
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html
>
> I probably have to make some changes to the networking
> configuration of my VM so it can be reached by the JobManager despite 
> using
> a different port each time.
>
> - Pieter
>
> 2016-02-06 14:05 GMT+01:00 Maximilian Michels :
>
>> Hi Pieter,
>>
>> Which version of Flink are you using? It appears you've created a
>> Flink YARN cluster but you can't reach the JobManager afterwards.
>>
>> Cheers,
>> Max
>>
>> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete <
>> phame...@gmail.com> wrote:
>> > Hi Robert,
>> >
>> > unfortunately there are no signs of what is going wrong in the
>> logs. The
>> > last log messages are about succesful registration of the
>> TaskManagers.
>> >
>> > I'm also fairly sure it must be something in my VM that is
>> causing this,
>> > because when I start the yarn-session from a login node that is
>> on the same
>> > network as the hadoop cluster there are no problems registering
>> with the
>> > JobManager. I did also notice the following message in the
>>>

Re: Error while reading binary file

2016-02-08 Thread Saliya Ekanayake
Till,

I am still having trouble getting this to work. Here's my code (
https://github.com/esaliya/flinkit)

String binaryFile = "src/main/resources/sample.bin";
SerializedInputFormat sif = new SerializedInputFormat<>();
sif.setFilePath(binaryFile);
DataSet ds = env.createInput(sif);
System.out.println(ds.count());


I still get the same error as shown below

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The type returned by
the input format could not be automatically determined. Please specify the
TypeInformation of the produced type explicitly by using the
'createInput(InputFormat, TypeInformation)' method instead.
at
org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511)
at org.saliya.flinkit.WordCount.main(WordCount.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)


On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann  wrote:

> Hi Saliya,
>
> in order to set the file path for the SerializedInputFormat you first
> have to create it and then explicitly call setFilePath.
>
> final SerializedInputFormat inputFormat = new 
> SerializedInputFormat();
> inputFormat.setFilePath(PATH_TO_FILE);
>
> env.createInput(inputFormat, myTypeInfo);
>
> Cheers,
> Till
> ​
>
> On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake 
> wrote:
>
>> Hi,
>>
>> I was trying to read a simple binary file using SerializedInputFormat as
>> suggested in a different thread, but encounters the following error. I
>> tried to do what the exception suggests, but eventhough createInput()
>> returns a DataSet object I couldn't find how to specify which file to read.
>>
>> Any help is appreciated. The file I am trying to read is a simple binary
>> file with containing java short values. Is there any example on reading
>> binary files available?
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: The type returned by
>> the input format could not be automatically determined. Please specify the
>> TypeInformation of the produced type explicitly by using the
>> 'createInput(InputFormat, TypeInformation)' method instead.
>>
>> Thank you,
>> Saliya
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http://saliya.org
>>
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org


Re: OutputFormat vs SinkFunction

2016-02-08 Thread Nick Dimiduk
On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels  wrote:

> Changing the class hierarchy would break backwards-compatibility of the
> API. However, we could add another method to DataStream to easily use
> OutputFormats in streaming.
>

Indeed, that's why I suggested deprecating one and moving toward a
consolidated class hierarchy. It won't happen overnight, but this can be
managed pretty easily with some adapter code like this and some additional
overrides in the public APIs.

How did you write your adapter? I came up with the one below.
>

Our implementations are similar. This one is working fine with my test code.

https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3

On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk  wrote:
>
>> In my case, I have my application code that is calling addSink, for which
>> I'm writing a test that needs to use LocalCollectionOutputFormat. Having
>> two separate class hierarchies is not helpful, hence the adapter. Much of
>> this code already exists in the implementation of FileSinkFunction, so the
>> project already supports it in a limited way.
>>
>> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels 
>> wrote:
>>
>>> Hi Nick,
>>>
>>> SinkFunction just implements user-defined functions on incoming
>>> elements. OutputFormat offers more lifecycle methods. Thus it is a
>>> more powerful interface. The OutputFormat originally comes from the
>>> batch API, whereas the SinkFunction originates from streaming. Those
>>> were more separate code paths in the past. Ultimately, it would make
>>> sense to have only the OutputFormat interface but I think we have to
>>> keep it to not break the API.
>>>
>>> If you need the lifecycle methods in streaming, there is
>>> RichSinkFunction, which implements OutputFormat and SinkFunction. In
>>> addition, it gives you access to the RuntimeContext. You can pass this
>>> directly to the "addSink(sinkFunction)" API method.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk 
>>> wrote:
>>> > Heya,
>>> >
>>> > Is there a plan to consolidate these two interfaces? They appear to
>>> provide
>>> > identical functionality, differing only in lifecycle management. I
>>> found
>>> > myself writing an adaptor so I can consume an OutputFormat where a
>>> > SinkFunction is expected; there's not much to it. This seems like code
>>> that
>>> > Flink should ship.
>>> >
>>> > Maybe one interface or the other can be deprecated for 1.0 API?
>>> >
>>> > Thanks,
>>> > Nick
>>>
>>
>>
>


Re: Flink on YARN: Stuck on "Trying to register at JobManager"

2016-02-08 Thread Pieter Hameete
Solved: indeed it needed to be built for YARN 2.7.1 specifically. Cheers!

2016-02-08 19:13 GMT+01:00 Robert Metzger :

> Mh, that's weird. Maybe both resource managers are marked as "standby"?
> Not sure what can cause this issue.
>
> Which YARN version are you using? Maybe you need to build Flink against
> that specific hadoop version yourself.
>
> On Mon, Feb 8, 2016 at 5:50 PM, Pieter Hameete  wrote:
>
>> After downloading and building the 1.0-SNAPSHOT from the master branch I
>> do run into another problem when starting a YARN cluster. The startup now
>> infinitely loops at the following step:
>>
>> 17:39:12,369 INFO
>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
>> over to rm2
>> 17:39:34,855 INFO
>> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing
>> over to rm1
>>
>> Any clue what couldve gone wrong? I used all-default for building with
>> maven.
>>
>> - Pieter
>>
>>
>>
>> 2016-02-08 17:07 GMT+01:00 Pieter Hameete :
>>
>>> Matter of RTFM eh ;-) thx and sorry for the bother.
>>>
>>> 2016-02-08 17:06 GMT+01:00 Robert Metzger :
>>>
 You said earlier that you are using Flink 0.10. The feature is only
 available in 1.0-SNAPSHOT.

 On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete 
 wrote:

> Ive tried setting the yarn.application-master.port property in
> flink-conf.yaml to a range suggested in
> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi
> rewalls
>
> The JobManager does not seem to be picking the property up. Am I
> setting this in the wrong place? Or is there another way to enforce this
> property?
>
> Cheers,
>
> Pieter
>
> 2016-02-07 20:04 GMT+01:00 Pieter Hameete :
>
>> I found the relevant information on the website. Ill consult with the
>> cluster admin tomorrow, thanks for the help :-)
>>
>> - Pieter
>>
>> 2016-02-07 19:31 GMT+01:00 Robert Metzger :
>>
>>> Hi,
>>>
>>> we had other users with a similar issue as well. There is a
>>> configuration value which allows you to specify a single port or a 
>>> range of
>>> ports for the JobManager to allocate when running on YARN.
>>> Note that when using this with a single port, the JMs may collide.
>>>
>>>
>>>
>>> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete 
>>> wrote:
>>>
 Hi Stephan,

 surely it seems this way! I must not be the first with this issue
 though? I'll have to contact the cluster admins to find a solution
 together. What would be a way of make the JobManagers accessible from
 outside the network, because the IP and port number changes every time.

 Alternatively, I can ask for ssh access to a node within the
 network. that will surely work but it's not my preferred solution.

 - Pieter

 2016-02-06 16:22 GMT+01:00 Stephan Ewen :

> Yeah, sounds a lot like the client cannot connect to the
> JobManager port.
>
> The ports to communicate with HDFS and the YARN resource manager
> may be whitelisted r forwarded, so you can submit the YARN session, 
> but
> then not connect to the JobManager afterwards.
>
>
>
> On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete  > wrote:
>
>> Hi Max!
>>
>> I'm using Flink 0.10.1 and indeed the cluster seems to be created
>> fine, all in the JobManager Web UI looks good.
>>
>> It seems like the JobManager initiates the connection with my VM
>> and cannot reach it. It could be that this is similar to the problem 
>> here:
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html
>>
>> I probably have to make some changes to the networking
>> configuration of my VM so it can be reached by the JobManager 
>> despite using
>> a different port each time.
>>
>> - Pieter
>>
>> 2016-02-06 14:05 GMT+01:00 Maximilian Michels :
>>
>>> Hi Pieter,
>>>
>>> Which version of Flink are you using? It appears you've created a
>>> Flink YARN cluster but you can't reach the JobManager afterwards.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete <
>>> phame...@gmail.com> wrote:
>>> > Hi Robert,
>>> >
>>> > unfortunately there are no signs of what is going wrong in the
>>> logs. The
>>> > last log messages are about succesful registration of the
>>> TaskManagers.
>>> >
>>> > I'm also fairly sure it must be something in my VM that is
>>> causing thi

Kafka partition alignment for event time

2016-02-08 Thread shikhar
My Flink job is doing aggregations on top of event-time based windowing
across Kafka partitions. As I have been developing and restarting it, the
state for the catch-up periods becomes unreliable -- lots of duplicate emits
for time windows already seen before, that I have to discard since my sink
can't handle it. There may be a bug in my job, but I wanted to clarify
whether this might be a flaw in Flink's handling of this.

I understand there is m:n mapping of partitions to sources depending on the
parallelism. Each source will have its own watermark. During catchup,
watermark progression can become pretty fragile, e.g. in my case where
there's n partitions and parallelism is 1.

I feel like some kind of event time alignment is needed across partitions. I
may be completely off here, so I look forward to your thoughts on this!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Error while reading binary file

2016-02-08 Thread Fabian Hueske
Hi,

please try to replace
DataSet ds = env.createInput(sif);
by
DataSet ds = env.createInput(sif,
ValueTypeInfo.SHORT_VALUE_TYPE_INFO);

Best, Fabian

2016-02-08 19:33 GMT+01:00 Saliya Ekanayake :

> Till,
>
> I am still having trouble getting this to work. Here's my code (
> https://github.com/esaliya/flinkit)
>
> String binaryFile = "src/main/resources/sample.bin";
> SerializedInputFormat sif = new SerializedInputFormat<>();
> sif.setFilePath(binaryFile);
> DataSet ds = env.createInput(sif);
> System.out.println(ds.count());
>
>
> I still get the same error as shown below
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The type returned by
> the input format could not be automatically determined. Please specify the
> TypeInformation of the produced type explicitly by using the
> 'createInput(InputFormat, TypeInformation)' method instead.
> at
> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511)
> at org.saliya.flinkit.WordCount.main(WordCount.java:24)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>
>
> On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann 
> wrote:
>
>> Hi Saliya,
>>
>> in order to set the file path for the SerializedInputFormat you first
>> have to create it and then explicitly call setFilePath.
>>
>> final SerializedInputFormat inputFormat = new 
>> SerializedInputFormat();
>> inputFormat.setFilePath(PATH_TO_FILE);
>>
>> env.createInput(inputFormat, myTypeInfo);
>>
>> Cheers,
>> Till
>> ​
>>
>> On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake 
>> wrote:
>>
>>> Hi,
>>>
>>> I was trying to read a simple binary file using SerializedInputFormat as
>>> suggested in a different thread, but encounters the following error. I
>>> tried to do what the exception suggests, but eventhough createInput()
>>> returns a DataSet object I couldn't find how to specify which file to read.
>>>
>>> Any help is appreciated. The file I am trying to read is a simple binary
>>> file with containing java short values. Is there any example on reading
>>> binary files available?
>>>
>>> Exception in thread "main"
>>> org.apache.flink.api.common.InvalidProgramException: The type returned by
>>> the input format could not be automatically determined. Please specify the
>>> TypeInformation of the produced type explicitly by using the
>>> 'createInput(InputFormat, TypeInformation)' method instead.
>>>
>>> Thank you,
>>> Saliya
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http://saliya.org
>>>
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>


Re: OutputFormat vs SinkFunction

2016-02-08 Thread Aljoscha Krettek
Hi,
one problem that I see with OutputFormat is that they are not made for a 
streaming world. By this, I mean that they don’t handle failure well and don’t 
consider fault-torelant streaming, i.e. exactly once semantics. For example, 
what would be expected to happen if a job with a FileOutputFormat fails and 
needs to recover. Now, there might be some garbage left in the files that would 
get emitted again after restoring to a checkpoint, thus leading to duplicate 
results.

Having OutputFormats in a Streaming programs can work well in toy examples and 
tests but can be dangerous in real-world jobs. I once talked with Robert about 
this and we came up with the idea (I think it was mostly him) of generalizing 
the RollingFileSink (which is fault-tolerance aware) so that it can easily be 
used with something akin to OutputFormats.

What do you think?

-Aljoscha
> On 08 Feb 2016, at 19:40, Nick Dimiduk  wrote:
> 
> On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels  wrote:
> Changing the class hierarchy would break backwards-compatibility of the API. 
> However, we could add another method to DataStream to easily use 
> OutputFormats in streaming.
> 
> Indeed, that's why I suggested deprecating one and moving toward a 
> consolidated class hierarchy. It won't happen overnight, but this can be 
> managed pretty easily with some adapter code like this and some additional 
> overrides in the public APIs.
> 
> How did you write your adapter? I came up with the one below.
> 
> Our implementations are similar. This one is working fine with my test code.
> 
> https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3
> 
> On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk  wrote:
> In my case, I have my application code that is calling addSink, for which I'm 
> writing a test that needs to use LocalCollectionOutputFormat. Having two 
> separate class hierarchies is not helpful, hence the adapter. Much of this 
> code already exists in the implementation of FileSinkFunction, so the project 
> already supports it in a limited way.
> 
> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels  wrote:
> Hi Nick,
> 
> SinkFunction just implements user-defined functions on incoming
> elements. OutputFormat offers more lifecycle methods. Thus it is a
> more powerful interface. The OutputFormat originally comes from the
> batch API, whereas the SinkFunction originates from streaming. Those
> were more separate code paths in the past. Ultimately, it would make
> sense to have only the OutputFormat interface but I think we have to
> keep it to not break the API.
> 
> If you need the lifecycle methods in streaming, there is
> RichSinkFunction, which implements OutputFormat and SinkFunction. In
> addition, it gives you access to the RuntimeContext. You can pass this
> directly to the "addSink(sinkFunction)" API method.
> 
> Cheers,
> Max
> 
> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk  wrote:
> > Heya,
> >
> > Is there a plan to consolidate these two interfaces? They appear to provide
> > identical functionality, differing only in lifecycle management. I found
> > myself writing an adaptor so I can consume an OutputFormat where a
> > SinkFunction is expected; there's not much to it. This seems like code that
> > Flink should ship.
> >
> > Maybe one interface or the other can be deprecated for 1.0 API?
> >
> > Thanks,
> > Nick
> 
> 
> 



Re: Error while reading binary file

2016-02-08 Thread Saliya Ekanayake
Thank you, Fabian. It solved the compilation error, but at runtime I get an
end-of-file exception. I've put up a sample code with data at Github
https://github.com/esaliya/flinkit. The data file is a binary file
containing 64 Short values.


02/08/2016 16:01:19 CHAIN DataSource (at main(WordCount.java:25)
(org.apache.flink.api.common.io.SerializedInputFormat)) -> FlatMap
(count())(4/8) switched to FAILED
java.io.EOFException
at java.io.DataInputStream.readShort(DataInputStream.java:315)
at
org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readShort(InputViewDataInputStreamWrapper.java:92)
at org.apache.flink.types.ShortValue.read(ShortValue.java:88)
at
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:37)
at
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:31)
at
org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

On Mon, Feb 8, 2016 at 3:50 PM, Fabian Hueske  wrote:

> Hi,
>
> please try to replace
> DataSet ds = env.createInput(sif);
> by
> DataSet ds = env.createInput(sif,
> ValueTypeInfo.SHORT_VALUE_TYPE_INFO);
>
> Best, Fabian
>
> 2016-02-08 19:33 GMT+01:00 Saliya Ekanayake :
>
>> Till,
>>
>> I am still having trouble getting this to work. Here's my code (
>> https://github.com/esaliya/flinkit)
>>
>> String binaryFile = "src/main/resources/sample.bin";
>> SerializedInputFormat sif = new SerializedInputFormat<>();
>> sif.setFilePath(binaryFile);
>> DataSet ds = env.createInput(sif);
>> System.out.println(ds.count());
>>
>>
>> I still get the same error as shown below
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: The type returned by
>> the input format could not be automatically determined. Please specify the
>> TypeInformation of the produced type explicitly by using the
>> 'createInput(InputFormat, TypeInformation)' method instead.
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511)
>> at org.saliya.flinkit.WordCount.main(WordCount.java:24)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>
>>
>> On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Saliya,
>>>
>>> in order to set the file path for the SerializedInputFormat you first
>>> have to create it and then explicitly call setFilePath.
>>>
>>> final SerializedInputFormat inputFormat = new 
>>> SerializedInputFormat();
>>> inputFormat.setFilePath(PATH_TO_FILE);
>>>
>>> env.createInput(inputFormat, myTypeInfo);
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake 
>>> wrote:
>>>
 Hi,

 I was trying to read a simple binary file using SerializedInputFormat
 as suggested in a different thread, but encounters the following error. I
 tried to do what the exception suggests, but eventhough createInput()
 returns a DataSet object I couldn't find how to specify which file to read.

 Any help is appreciated. The file I am trying to read is a simple
 binary file with containing java short values. Is there any example on
 reading binary files available?

 Exception in thread "main"
 org.apache.flink.api.common.InvalidProgramException: The type returned by
 the input format could not be automatically determined. Please specify the
 TypeInformation of the produced type explicitly by using the
 'createInput(InputFormat, TypeInformation)' method instead.

 Thank you,
 Saliya


 --
 Saliya Ekanayake
 Ph.D. Candidate | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington
 Cell 812-391-4914
 http://saliya.org

>>>
>>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http://saliya.org
>>
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org


Re: Kafka partition alignment for event time

2016-02-08 Thread shikhar
Things make more sense after coming across
http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tbasy...@mail.gmail.com%3E

I need to ensure the parallelism is at least the number of partitions. This
seems like a gotcha that could be better documented or automatically
enforced.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4786.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka partition alignment for event time

2016-02-08 Thread Aljoscha Krettek
Hi,
what do you mean by this? I think it should also work when setting parallelism 
to 1. If not, then there is either a problem with Flink or maybe something in 
the Data.

-Aljoscha
> On 08 Feb 2016, at 21:43, shikhar  wrote:
> 
> Things make more sense after coming across
> http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tbasy...@mail.gmail.com%3E
> 
> I need to ensure the parallelism is at least the number of partitions. This
> seems like a gotcha that could be better documented or automatically
> enforced.
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4786.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Kafka partition alignment for event time

2016-02-08 Thread shikhar
Stephan explained in that thread that we're picking the min watermark when
doing operations that join streams from multiple sources. If we have m:n
partition-source assignment where m>n, the source is going to end up with
the max watermark. Having m<=n ensures that the lowest watermark is used.

Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition
on a source should require opt-in, e.g. allowOversubscription()



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Error while reading binary file

2016-02-08 Thread Fabian Hueske
The SerializedInputFormat extends the BinaryInputFormat which expects a
special block-wise encoding and certain metadata fields.
It is not suited to read arbitrary binary files such as a file with 64
short values.
I suggest to implement a custom input format based on FileInputFormat.

Best, Fabian

2016-02-08 22:05 GMT+01:00 Saliya Ekanayake :

> Thank you, Fabian. It solved the compilation error, but at runtime I get
> an end-of-file exception. I've put up a sample code with data at Github
> https://github.com/esaliya/flinkit. The data file is a binary file
> containing 64 Short values.
>
>
> 02/08/2016 16:01:19 CHAIN DataSource (at main(WordCount.java:25)
> (org.apache.flink.api.common.io.SerializedInputFormat)) -> FlatMap
> (count())(4/8) switched to FAILED
> java.io.EOFException
> at java.io.DataInputStream.readShort(DataInputStream.java:315)
> at
> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readShort(InputViewDataInputStreamWrapper.java:92)
> at org.apache.flink.types.ShortValue.read(ShortValue.java:88)
> at
> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:37)
> at
> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:31)
> at
> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> On Mon, Feb 8, 2016 at 3:50 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> please try to replace
>> DataSet ds = env.createInput(sif);
>> by
>> DataSet ds = env.createInput(sif,
>> ValueTypeInfo.SHORT_VALUE_TYPE_INFO);
>>
>> Best, Fabian
>>
>> 2016-02-08 19:33 GMT+01:00 Saliya Ekanayake :
>>
>>> Till,
>>>
>>> I am still having trouble getting this to work. Here's my code (
>>> https://github.com/esaliya/flinkit)
>>>
>>> String binaryFile = "src/main/resources/sample.bin";
>>> SerializedInputFormat sif = new SerializedInputFormat<>();
>>> sif.setFilePath(binaryFile);
>>> DataSet ds = env.createInput(sif);
>>> System.out.println(ds.count());
>>>
>>>
>>> I still get the same error as shown below
>>>
>>> Exception in thread "main"
>>> org.apache.flink.api.common.InvalidProgramException: The type returned by
>>> the input format could not be automatically determined. Please specify the
>>> TypeInformation of the produced type explicitly by using the
>>> 'createInput(InputFormat, TypeInformation)' method instead.
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511)
>>> at org.saliya.flinkit.WordCount.main(WordCount.java:24)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>>>
>>>
>>> On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann 
>>> wrote:
>>>
 Hi Saliya,

 in order to set the file path for the SerializedInputFormat you first
 have to create it and then explicitly call setFilePath.

 final SerializedInputFormat inputFormat = new 
 SerializedInputFormat();
 inputFormat.setFilePath(PATH_TO_FILE);

 env.createInput(inputFormat, myTypeInfo);

 Cheers,
 Till
 ​

 On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake 
 wrote:

> Hi,
>
> I was trying to read a simple binary file using SerializedInputFormat
> as suggested in a different thread, but encounters the following error. I
> tried to do what the exception suggests, but eventhough createInput()
> returns a DataSet object I couldn't find how to specify which file to 
> read.
>
> Any help is appreciated. The file I am trying to read is a simple
> binary file with containing java short values. Is there any example on
> reading binary files available?
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The type returned by
> the input format could not be automatically determined. Please specify the
> TypeInformation of the produced type explicitly by using the
> 'createInput(InputFormat, TypeInformation)' method instead.
>
> Thank you,
> Saliya
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>


>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http:/

Re: Error while reading binary file

2016-02-08 Thread Saliya Ekanayake
Thank you, Fabian. I'll try to do it.

On Mon, Feb 8, 2016 at 4:37 PM, Fabian Hueske  wrote:

> The SerializedInputFormat extends the BinaryInputFormat which expects a
> special block-wise encoding and certain metadata fields.
> It is not suited to read arbitrary binary files such as a file with 64
> short values.
> I suggest to implement a custom input format based on FileInputFormat.
>
> Best, Fabian
>
> 2016-02-08 22:05 GMT+01:00 Saliya Ekanayake :
>
>> Thank you, Fabian. It solved the compilation error, but at runtime I get
>> an end-of-file exception. I've put up a sample code with data at Github
>> https://github.com/esaliya/flinkit. The data file is a binary file
>> containing 64 Short values.
>>
>>
>> 02/08/2016 16:01:19 CHAIN DataSource (at main(WordCount.java:25)
>> (org.apache.flink.api.common.io.SerializedInputFormat)) -> FlatMap
>> (count())(4/8) switched to FAILED
>> java.io.EOFException
>> at java.io.DataInputStream.readShort(DataInputStream.java:315)
>> at
>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readShort(InputViewDataInputStreamWrapper.java:92)
>> at org.apache.flink.types.ShortValue.read(ShortValue.java:88)
>> at
>> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:37)
>> at
>> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:31)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Mon, Feb 8, 2016 at 3:50 PM, Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> please try to replace
>>> DataSet ds = env.createInput(sif);
>>> by
>>> DataSet ds = env.createInput(sif,
>>> ValueTypeInfo.SHORT_VALUE_TYPE_INFO);
>>>
>>> Best, Fabian
>>>
>>> 2016-02-08 19:33 GMT+01:00 Saliya Ekanayake :
>>>
 Till,

 I am still having trouble getting this to work. Here's my code (
 https://github.com/esaliya/flinkit)

 String binaryFile = "src/main/resources/sample.bin";
 SerializedInputFormat sif = new SerializedInputFormat<>();
 sif.setFilePath(binaryFile);
 DataSet ds = env.createInput(sif);
 System.out.println(ds.count());


 I still get the same error as shown below

 Exception in thread "main"
 org.apache.flink.api.common.InvalidProgramException: The type returned by
 the input format could not be automatically determined. Please specify the
 TypeInformation of the produced type explicitly by using the
 'createInput(InputFormat, TypeInformation)' method instead.
 at
 org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511)
 at org.saliya.flinkit.WordCount.main(WordCount.java:24)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)


 On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann 
 wrote:

> Hi Saliya,
>
> in order to set the file path for the SerializedInputFormat you first
> have to create it and then explicitly call setFilePath.
>
> final SerializedInputFormat inputFormat = new 
> SerializedInputFormat();
> inputFormat.setFilePath(PATH_TO_FILE);
>
> env.createInput(inputFormat, myTypeInfo);
>
> Cheers,
> Till
> ​
>
> On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake 
> wrote:
>
>> Hi,
>>
>> I was trying to read a simple binary file using SerializedInputFormat
>> as suggested in a different thread, but encounters the following error. I
>> tried to do what the exception suggests, but eventhough createInput()
>> returns a DataSet object I couldn't find how to specify which file to 
>> read.
>>
>> Any help is appreciated. The file I am trying to read is a simple
>> binary file with containing java short values. Is there any example on
>> reading binary files available?
>>
>> Exception in thread "main"
>> org.apache.flink.api.common.InvalidProgramException: The type returned by
>> the input format could not be automatically determined. Please specify 
>> the
>> TypeInformation of the produced type explicitly by using the
>> 'createInput(InputFormat, TypeInformation)' method instead.
>>
>> Thank you,
>> Saliya
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http:

Re: OutputFormat vs SinkFunction

2016-02-08 Thread Nick Dimiduk
I think this depends on the implementation of the OutputFormat. For
instance, an HBase, Cassandra or ES OF will handle most operations as
idempotent when the scheme is designed appropriately.

You are (rightly) focusing on FileOF's, which also depend on the semantics
of their implementation. MR always required an atomic rename of the DFS,
and only moved output files into place once the task commits its output.

Also I think it unreasonable to bring exactly once considerations into the
discussion because nothing provides this right now without a multi-stage
commit protocol. Such a protocol would be provided at the framework level
and to the best of my knowledge it's semantic expectations on the output
handler are undefined.

My original question comes from wanting to use the LocalCollectionOF to
test a streaming flow that sinks to Kafka, without rewriting the flow in
test code. So in this case you're right that it does apply to tests. I
don't think correctness of tests is a trivial concern though.

As for RollingFileSink, I've not seen this conversation so I cannot
comment. Per my earlier examples, I think it's not correct to assume all OF
implementations are file-based.

On Monday, February 8, 2016, Aljoscha Krettek  wrote:

> Hi,
> one problem that I see with OutputFormat is that they are not made for a
> streaming world. By this, I mean that they don’t handle failure well and
> don’t consider fault-torelant streaming, i.e. exactly once semantics. For
> example, what would be expected to happen if a job with a FileOutputFormat
> fails and needs to recover. Now, there might be some garbage left in the
> files that would get emitted again after restoring to a checkpoint, thus
> leading to duplicate results.
>
> Having OutputFormats in a Streaming programs can work well in toy examples
> and tests but can be dangerous in real-world jobs. I once talked with
> Robert about this and we came up with the idea (I think it was mostly him)
> of generalizing the RollingFileSink (which is fault-tolerance aware) so
> that it can easily be used with something akin to OutputFormats.
>
> What do you think?
>
> -Aljoscha
> > On 08 Feb 2016, at 19:40, Nick Dimiduk  > wrote:
> >
> > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels  > wrote:
> > Changing the class hierarchy would break backwards-compatibility of the
> API. However, we could add another method to DataStream to easily use
> OutputFormats in streaming.
> >
> > Indeed, that's why I suggested deprecating one and moving toward a
> consolidated class hierarchy. It won't happen overnight, but this can be
> managed pretty easily with some adapter code like this and some additional
> overrides in the public APIs.
> >
> > How did you write your adapter? I came up with the one below.
> >
> > Our implementations are similar. This one is working fine with my test
> code.
> >
> > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3
> >
> > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk  > wrote:
> > In my case, I have my application code that is calling addSink, for
> which I'm writing a test that needs to use LocalCollectionOutputFormat.
> Having two separate class hierarchies is not helpful, hence the adapter.
> Much of this code already exists in the implementation of FileSinkFunction,
> so the project already supports it in a limited way.
> >
> > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels  > wrote:
> > Hi Nick,
> >
> > SinkFunction just implements user-defined functions on incoming
> > elements. OutputFormat offers more lifecycle methods. Thus it is a
> > more powerful interface. The OutputFormat originally comes from the
> > batch API, whereas the SinkFunction originates from streaming. Those
> > were more separate code paths in the past. Ultimately, it would make
> > sense to have only the OutputFormat interface but I think we have to
> > keep it to not break the API.
> >
> > If you need the lifecycle methods in streaming, there is
> > RichSinkFunction, which implements OutputFormat and SinkFunction. In
> > addition, it gives you access to the RuntimeContext. You can pass this
> > directly to the "addSink(sinkFunction)" API method.
> >
> > Cheers,
> > Max
> >
> > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk  > wrote:
> > > Heya,
> > >
> > > Is there a plan to consolidate these two interfaces? They appear to
> provide
> > > identical functionality, differing only in lifecycle management. I
> found
> > > myself writing an adaptor so I can consume an OutputFormat where a
> > > SinkFunction is expected; there's not much to it. This seems like code
> that
> > > Flink should ship.
> > >
> > > Maybe one interface or the other can be deprecated for 1.0 API?
> > >
> > > Thanks,
> > > Nick
> >
> >
> >
>
>