Failed to submit 0.10.1
Hi All I’m new to flink and come to the step to submit to a remote cluster, and it failed with following message: Association with remote system [akka.tcp://flink@127.0.0.1:61231] has failed, address is now gated for [5000] ms. Reason is: [scala.Option; local class incompatible: stream classdesc serialVersionUID = -2062608324514658839, local class serialVersionUID = -114498752079829388]. I have doubled checked that my client and server version are the same(0.10.1), but my java version is a bit different Java(TM) SE Runtime Environment (build 1.8.0_25-b17) vs. Java(TM) SE Runtime Environment (build 1.8.0_60-b27) Is java the issue or is there anything else i may be missing? Many thanks Andrew -- Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message.
Re: Failed to submit 0.10.1
Hi Andrew, It appears that you're using two different versions of the Scala library in your Flink job. Please make sure you use either 2.10 or 2.11 but not both at the same time. Best, Max On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu wrote: > Hi All > > I’m new to flink and come to the step to submit to a remote cluster, and it > failed with following message: > > Association with remote system [akka.tcp://flink@127.0.0.1:61231] has failed, > address is now gated for [5000] ms. Reason is: [scala.Option; local class > incompatible: stream classdesc serialVersionUID = -2062608324514658839, local > class serialVersionUID = -114498752079829388]. > > I have doubled checked that my client and server version are the > same(0.10.1), but my java version is a bit different > Java(TM) SE Runtime Environment (build 1.8.0_25-b17) > vs. > Java(TM) SE Runtime Environment (build 1.8.0_60-b27) > > Is java the issue or is there anything else i may be missing? > > > Many thanks > > > Andrew > > > -- > Confidentiality Notice: This e-mail transmission may contain confidential > or legally privileged information that is intended only for the individual > or entity named in the e-mail address. If you are not the intended > recipient, you are hereby notified that any disclosure, copying, > distribution, or reliance upon the contents of this e-mail is strictly > prohibited and may be unlawful. If you have received this e-mail in error, > please notify the sender immediately by return e-mail and delete all copies > of this message.
Re: Error while reading binary file
Hi Saliya, in order to set the file path for the SerializedInputFormat you first have to create it and then explicitly call setFilePath. final SerializedInputFormat inputFormat = new SerializedInputFormat(); inputFormat.setFilePath(PATH_TO_FILE); env.createInput(inputFormat, myTypeInfo); Cheers, Till On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake wrote: > Hi, > > I was trying to read a simple binary file using SerializedInputFormat as > suggested in a different thread, but encounters the following error. I > tried to do what the exception suggests, but eventhough createInput() > returns a DataSet object I couldn't find how to specify which file to read. > > Any help is appreciated. The file I am trying to read is a simple binary > file with containing java short values. Is there any example on reading > binary files available? > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The type returned by > the input format could not be automatically determined. Please specify the > TypeInformation of the produced type explicitly by using the > 'createInput(InputFormat, TypeInformation)' method instead. > > Thank you, > Saliya > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org >
Re: Error while reading binary file
Hi Saliya, Thanks for your question. Flink's type analyzer couldn't extract the type information. You may implement the ResultTypeQueryable interface in your custom source. That way you can manually specify the correct type. If that doesn't help you, could you please share more of the stack trace? Thanks, Max On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake wrote: > Hi, > > I was trying to read a simple binary file using SerializedInputFormat as > suggested in a different thread, but encounters the following error. I tried > to do what the exception suggests, but eventhough createInput() returns a > DataSet object I couldn't find how to specify which file to read. > > Any help is appreciated. The file I am trying to read is a simple binary > file with containing java short values. Is there any example on reading > binary files available? > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The type returned by > the input format could not be automatically determined. Please specify the > TypeInformation of the produced type explicitly by using the > 'createInput(InputFormat, TypeInformation)' method instead. > > Thank you, > Saliya > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org
Re: OutputFormat vs SinkFunction
Hi Nick, SinkFunction just implements user-defined functions on incoming elements. OutputFormat offers more lifecycle methods. Thus it is a more powerful interface. The OutputFormat originally comes from the batch API, whereas the SinkFunction originates from streaming. Those were more separate code paths in the past. Ultimately, it would make sense to have only the OutputFormat interface but I think we have to keep it to not break the API. If you need the lifecycle methods in streaming, there is RichSinkFunction, which implements OutputFormat and SinkFunction. In addition, it gives you access to the RuntimeContext. You can pass this directly to the "addSink(sinkFunction)" API method. Cheers, Max On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk wrote: > Heya, > > Is there a plan to consolidate these two interfaces? They appear to provide > identical functionality, differing only in lifecycle management. I found > myself writing an adaptor so I can consume an OutputFormat where a > SinkFunction is expected; there's not much to it. This seems like code that > Flink should ship. > > Maybe one interface or the other can be deprecated for 1.0 API? > > Thanks, > Nick
Re: Distribution of sinks among the nodes
Hi, I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT builds you should be able to use: env.setParallelism(4); env .addSource(kafkaSource) .rescale() .map(mapper).setParallelism(16); .rescale() .addSink(kafkaSink); to get your desired behavior. For this to work, the parallelism should be set to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. The source will only be connected to the 4 mappers while the 4 mappers will be the only ones connected to the sink. Cheers, Aljoscha > On 04 Feb 2016, at 18:29, Aljoscha Krettek wrote: > > I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336 > > This will implement the data shipping pattern that you mentioned in your > initial mail. I also have the Pull request almost ready. > >> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers >> wrote: >> >> Okay ; >> >> Then I guess that the best we can do is to disable chaining (we really want >> one thread per operator since they are doing long operations) and have the >> same parallelism for sinks as mapping : that way each map will have it’s own >> sink and there will be no exchanges between flink instances. >> >> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of >> Stephan Ewen >> Sent: jeudi 4 février 2016 15:13 >> To: user@flink.apache.org >> Subject: Re: Distribution of sinks among the nodes >> >> To your other question, there are two things in Flink: >> >> (1) Chaining. Tasks are folded together into one task, run by one thread. >> >> (2) Resource groups: Tasks stay separate, have separate threads, but share a >> slot (which means share memory resources). See the link in my previous mail >> for an explanation concerning those. >> >> Greetings, >> Stephan >> >> >> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen wrote: >> Hi Gwen! >> >> You actually need not 24 slots, but only as many as the highest parallelism >> is (16). Slots do not hold individual tasks, but "pipelines". >> >> Here is an illustration how that works. >> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots >> >> You can control whether a task can share the slot with the previous task >> with the function "startNewResourceGroup()" in the streaming API. Sharing >> lots makes a few things easier to reason about, especially when adding >> operators to a program, you need not immediately add new machines. >> >> >> How to solve your program case >> >> >> We can actually make a pretty simple addition to Flink that will cause the >> tasks to be locally connected, which in turn will cause the scheduler to >> distribute them like you intend. >> Rather than let the 4 sources rebalance across all 16 mappers, each one >> should redistribute to 4 local mappers, and these 4 mappers should send data >> to one local sink each. >> >> We'll try and add that today and ping you once it is in. >> >> The following would be sample code to use this: >> >> env.setParallelism(4); >> >> env >>.addSource(kafkaSource) >>.partitionFan() >>.map(mapper).setParallelism(16); >>.partitionFan() >>.addSink(kafkaSink); >> >> >> >> A bit of background why the mechanism is the way that it is right now >> -- >> >> You can think of a slot as a slice of resources. In particular, an amount of >> memory from the memory manager, but also memory in the network stack. >> >> What we want to do quite soon is to make streaming programs more elastic. >> Consider for example the case that you have 16 slots on 4 machines, a >> machine fails, and you have no spare resources. In that case Flink should >> recognize that no spare resource can be acquired, and scale the job in. >> Since you have only 12 slots left, the parallelism of the mappers is reduced >> to 12, and the source task that was on the failed machine is moved to a slot >> on another machine. >> >> It is important that the guaranteed resources for each task do not change >> when scaling in, to keep behavior predictable. In this case, each slot will >> still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots >> before. That is also the reason why the slots are per TaskManager, and not >> global, to associate them with a constant set of resources (mainly memory). >> >> >> Greetings, >> Stephan >> >> >> >> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers >> wrote: >> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 >> sinks) ? >> >> Or is there a way not to set the number of slots per TaskManager instead of >> globally so that they are at least equally dispatched among the nodes ? >> >> As for the sink deployment : that’s not good news ; I mean we will have a >> non-negligible overhead : all the data gene
Re: Failed to submit 0.10.1
Thanks Max My local and remote environment are running: Scala code runner version 2.11.7 -- Copyright 2002-2013, LAMP/EPFL And I downloaded binary 2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz), Is there a different version of client lib for scala 2.11? Best, Andrew > On 08 Feb 2016, at 11:30, Maximilian Michels wrote: > > Hi Andrew, > > It appears that you're using two different versions of the Scala > library in your Flink job. Please make sure you use either 2.10 or > 2.11 but not both at the same time. > > Best, > Max > > On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu wrote: >> Hi All >> >> I’m new to flink and come to the step to submit to a remote cluster, and it >> failed with following message: >> >> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has >> failed, address is now gated for [5000] ms. Reason is: [scala.Option; local >> class incompatible: stream classdesc serialVersionUID = >> -2062608324514658839, local class serialVersionUID = -114498752079829388]. >> >> I have doubled checked that my client and server version are the >> same(0.10.1), but my java version is a bit different >> Java(TM) SE Runtime Environment (build 1.8.0_25-b17) >> vs. >> Java(TM) SE Runtime Environment (build 1.8.0_60-b27) >> >> Is java the issue or is there anything else i may be missing? >> >> >> Many thanks >> >> >> Andrew >> >> >> -- >> Confidentiality Notice: This e-mail transmission may contain confidential >> or legally privileged information that is intended only for the individual >> or entity named in the e-mail address. If you are not the intended >> recipient, you are hereby notified that any disclosure, copying, >> distribution, or reliance upon the contents of this e-mail is strictly >> prohibited and may be unlawful. If you have received this e-mail in error, >> please notify the sender immediately by return e-mail and delete all copies >> of this message. -- Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message.
Re: Failed to submit 0.10.1
Yes, found a special dependency for 2.11, Thanks! org.apache.flink flink-streaming-java_2.11 ${apache.flink.versin} Andrew > On 08 Feb 2016, at 14:18, Andrew Ge Wu wrote: > > Thanks Max > > My local and remote environment are running: Scala code runner version 2.11.7 > -- Copyright 2002-2013, LAMP/EPFL > And I downloaded binary > 2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz), > Is there a different version of client lib for scala 2.11? > > > Best, > > Andrew > >> On 08 Feb 2016, at 11:30, Maximilian Michels wrote: >> >> Hi Andrew, >> >> It appears that you're using two different versions of the Scala >> library in your Flink job. Please make sure you use either 2.10 or >> 2.11 but not both at the same time. >> >> Best, >> Max >> >> On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu wrote: >>> Hi All >>> >>> I’m new to flink and come to the step to submit to a remote cluster, and it >>> failed with following message: >>> >>> Association with remote system [akka.tcp://flink@127.0.0.1:61231] has >>> failed, address is now gated for [5000] ms. Reason is: [scala.Option; local >>> class incompatible: stream classdesc serialVersionUID = >>> -2062608324514658839, local class serialVersionUID = -114498752079829388]. >>> >>> I have doubled checked that my client and server version are the >>> same(0.10.1), but my java version is a bit different >>> Java(TM) SE Runtime Environment (build 1.8.0_25-b17) >>> vs. >>> Java(TM) SE Runtime Environment (build 1.8.0_60-b27) >>> >>> Is java the issue or is there anything else i may be missing? >>> >>> >>> Many thanks >>> >>> >>> Andrew >>> >>> >>> -- >>> Confidentiality Notice: This e-mail transmission may contain confidential >>> or legally privileged information that is intended only for the individual >>> or entity named in the e-mail address. If you are not the intended >>> recipient, you are hereby notified that any disclosure, copying, >>> distribution, or reliance upon the contents of this e-mail is strictly >>> prohibited and may be unlawful. If you have received this e-mail in error, >>> please notify the sender immediately by return e-mail and delete all copies >>> of this message. > -- Confidentiality Notice: This e-mail transmission may contain confidential or legally privileged information that is intended only for the individual or entity named in the e-mail address. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or reliance upon the contents of this e-mail is strictly prohibited and may be unlawful. If you have received this e-mail in error, please notify the sender immediately by return e-mail and delete all copies of this message.
Re: Failed to submit 0.10.1
You're welcome. As of recent changes, all Maven artifact names are now suffixed with the Scala major version. However, the old artifacts are still available for the snapshot version. I've just pushed an empty flink-streaming-java for 1.0-SNAPSHOT to prevent users from compiling code which would fail later on. Cheers, Max On Mon, Feb 8, 2016 at 2:32 PM, Andrew Ge Wu wrote: > Yes, found a special dependency for 2.11, Thanks! > > > > org.apache.flink > flink-streaming-java_2.11 > ${apache.flink.versin} > > > > Andrew > > On 08 Feb 2016, at 14:18, Andrew Ge Wu wrote: > > Thanks Max > > My local and remote environment are running: Scala code runner version > 2.11.7 -- Copyright 2002-2013, LAMP/EPFL > And I downloaded binary > 2.11(//apache.mirrors.spacedump.net/flink/flink-0.10.1/flink-0.10.1-bin-hadoop27-scala_2.11.tgz), > Is there a different version of client lib for scala 2.11? > > > Best, > > Andrew > > On 08 Feb 2016, at 11:30, Maximilian Michels wrote: > > Hi Andrew, > > It appears that you're using two different versions of the Scala > library in your Flink job. Please make sure you use either 2.10 or > 2.11 but not both at the same time. > > Best, > Max > > On Mon, Feb 8, 2016 at 10:30 AM, Andrew Ge Wu > wrote: > > Hi All > > I’m new to flink and come to the step to submit to a remote cluster, and it > failed with following message: > > Association with remote system [akka.tcp://flink@127.0.0.1:61231] has > failed, address is now gated for [5000] ms. Reason is: [scala.Option; local > class incompatible: stream classdesc serialVersionUID = > -2062608324514658839, local class serialVersionUID = -114498752079829388]. > > I have doubled checked that my client and server version are the > same(0.10.1), but my java version is a bit different > Java(TM) SE Runtime Environment (build 1.8.0_25-b17) > vs. > Java(TM) SE Runtime Environment (build 1.8.0_60-b27) > > Is java the issue or is there anything else i may be missing? > > > Many thanks > > > Andrew > > > -- > Confidentiality Notice: This e-mail transmission may contain confidential > or legally privileged information that is intended only for the individual > or entity named in the e-mail address. If you are not the intended > recipient, you are hereby notified that any disclosure, copying, > distribution, or reliance upon the contents of this e-mail is strictly > prohibited and may be unlawful. If you have received this e-mail in error, > please notify the sender immediately by return e-mail and delete all copies > of this message. > > > > > Confidentiality Notice: This e-mail transmission may contain confidential or > legally privileged information that is intended only for the individual or > entity named in the e-mail address. If you are not the intended recipient, > you are hereby notified that any disclosure, copying, distribution, or > reliance upon the contents of this e-mail is strictly prohibited and may be > unlawful. If you have received this e-mail in error, please notify the > sender immediately by return e-mail and delete all copies of this message.
Re: Error while reading binary file
Thank you Till and Max. I'll try the set file path method and let you know. On Feb 8, 2016 5:45 AM, "Maximilian Michels" wrote: > Hi Saliya, > > Thanks for your question. Flink's type analyzer couldn't extract the > type information. You may implement the ResultTypeQueryable interface > in your custom source. That way you can manually specify the correct > type. If that doesn't help you, could you please share more of the > stack trace? > > Thanks, > Max > > On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake > wrote: > > Hi, > > > > I was trying to read a simple binary file using SerializedInputFormat as > > suggested in a different thread, but encounters the following error. I > tried > > to do what the exception suggests, but eventhough createInput() returns a > > DataSet object I couldn't find how to specify which file to read. > > > > Any help is appreciated. The file I am trying to read is a simple binary > > file with containing java short values. Is there any example on reading > > binary files available? > > > > Exception in thread "main" > > org.apache.flink.api.common.InvalidProgramException: The type returned by > > the input format could not be automatically determined. Please specify > the > > TypeInformation of the produced type explicitly by using the > > 'createInput(InputFormat, TypeInformation)' method instead. > > > > Thank you, > > Saliya > > > > > > -- > > Saliya Ekanayake > > Ph.D. Candidate | Research Assistant > > School of Informatics and Computing | Digital Science Center > > Indiana University, Bloomington > > Cell 812-391-4914 > > http://saliya.org >
Re: Flink on YARN: Stuck on "Trying to register at JobManager"
You said earlier that you are using Flink 0.10. The feature is only available in 1.0-SNAPSHOT. On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete wrote: > Ive tried setting the yarn.application-master.port property in > flink-conf.yaml to a range suggested in > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi > rewalls > > The JobManager does not seem to be picking the property up. Am I setting > this in the wrong place? Or is there another way to enforce this property? > > Cheers, > > Pieter > > 2016-02-07 20:04 GMT+01:00 Pieter Hameete : > >> I found the relevant information on the website. Ill consult with the >> cluster admin tomorrow, thanks for the help :-) >> >> - Pieter >> >> 2016-02-07 19:31 GMT+01:00 Robert Metzger : >> >>> Hi, >>> >>> we had other users with a similar issue as well. There is a >>> configuration value which allows you to specify a single port or a range of >>> ports for the JobManager to allocate when running on YARN. >>> Note that when using this with a single port, the JMs may collide. >>> >>> >>> >>> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete >>> wrote: >>> Hi Stephan, surely it seems this way! I must not be the first with this issue though? I'll have to contact the cluster admins to find a solution together. What would be a way of make the JobManagers accessible from outside the network, because the IP and port number changes every time. Alternatively, I can ask for ssh access to a node within the network. that will surely work but it's not my preferred solution. - Pieter 2016-02-06 16:22 GMT+01:00 Stephan Ewen : > Yeah, sounds a lot like the client cannot connect to the JobManager > port. > > The ports to communicate with HDFS and the YARN resource manager may > be whitelisted r forwarded, so you can submit the YARN session, but then > not connect to the JobManager afterwards. > > > > On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete > wrote: > >> Hi Max! >> >> I'm using Flink 0.10.1 and indeed the cluster seems to be created >> fine, all in the JobManager Web UI looks good. >> >> It seems like the JobManager initiates the connection with my VM and >> cannot reach it. It could be that this is similar to the problem here: >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html >> >> I probably have to make some changes to the networking configuration >> of my VM so it can be reached by the JobManager despite using a different >> port each time. >> >> - Pieter >> >> 2016-02-06 14:05 GMT+01:00 Maximilian Michels : >> >>> Hi Pieter, >>> >>> Which version of Flink are you using? It appears you've created a >>> Flink YARN cluster but you can't reach the JobManager afterwards. >>> >>> Cheers, >>> Max >>> >>> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete >>> wrote: >>> > Hi Robert, >>> > >>> > unfortunately there are no signs of what is going wrong in the >>> logs. The >>> > last log messages are about succesful registration of the >>> TaskManagers. >>> > >>> > I'm also fairly sure it must be something in my VM that is causing >>> this, >>> > because when I start the yarn-session from a login node that is on >>> the same >>> > network as the hadoop cluster there are no problems registering >>> with the >>> > JobManager. I did also notice the following message in the local >>> console: >>> > >>> > 12:30:27,173 WARN Remoting >>> > - Tried to associate with unreachable remote address >>> > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated for >>> 5000 ms, >>> > all messages to this address will be delivered to dead letters. >>> Reason: >>> > connection timed out: /145.100.41.13:41539 >>> > >>> > I can ping the JobManager fine from with VM. Could there be some >>> invalid or >>> > missing configuration on my side? >>> > >>> > Cheers, >>> > >>> > Pieter >>> > >>> > >>> > 2016-02-06 12:54 GMT+01:00 Robert Metzger : >>> >> >>> >> Hi, >>> >> >>> >> did you check the logs of the JobManager itself? Maybe it'll tell >>> us >>> >> already whats going on. >>> >> >>> >> On Sat, Feb 6, 2016 at 12:14 PM, Pieter Hameete < >>> phame...@gmail.com> >>> >> wrote: >>> >>> >>> >>> Hi Guys! >>> >>> >>> >>> Im attempting to run Flink on YARN, but I run into an issue. Im >>> starting >>> >>> the Flink YARN session from an Ubuntu 14.04 VM. All goes well >>> until after >>> >>> the JobManager web UI is started: >>> >>> >>> >>> JobManager web interface address >>> >>> >>> http://head05.hathi.surfsara.nl:8088/proxy/application_145278
Re: Flink on YARN: Stuck on "Trying to register at JobManager"
Matter of RTFM eh ;-) thx and sorry for the bother. 2016-02-08 17:06 GMT+01:00 Robert Metzger : > You said earlier that you are using Flink 0.10. The feature is only > available in 1.0-SNAPSHOT. > > On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete wrote: > >> Ive tried setting the yarn.application-master.port property in >> flink-conf.yaml to a range suggested in >> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi >> rewalls >> >> The JobManager does not seem to be picking the property up. Am I setting >> this in the wrong place? Or is there another way to enforce this property? >> >> Cheers, >> >> Pieter >> >> 2016-02-07 20:04 GMT+01:00 Pieter Hameete : >> >>> I found the relevant information on the website. Ill consult with the >>> cluster admin tomorrow, thanks for the help :-) >>> >>> - Pieter >>> >>> 2016-02-07 19:31 GMT+01:00 Robert Metzger : >>> Hi, we had other users with a similar issue as well. There is a configuration value which allows you to specify a single port or a range of ports for the JobManager to allocate when running on YARN. Note that when using this with a single port, the JMs may collide. On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete wrote: > Hi Stephan, > > surely it seems this way! I must not be the first with this issue > though? I'll have to contact the cluster admins to find a solution > together. What would be a way of make the JobManagers accessible from > outside the network, because the IP and port number changes every time. > > Alternatively, I can ask for ssh access to a node within the network. > that will surely work but it's not my preferred solution. > > - Pieter > > 2016-02-06 16:22 GMT+01:00 Stephan Ewen : > >> Yeah, sounds a lot like the client cannot connect to the JobManager >> port. >> >> The ports to communicate with HDFS and the YARN resource manager may >> be whitelisted r forwarded, so you can submit the YARN session, but then >> not connect to the JobManager afterwards. >> >> >> >> On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete >> wrote: >> >>> Hi Max! >>> >>> I'm using Flink 0.10.1 and indeed the cluster seems to be created >>> fine, all in the JobManager Web UI looks good. >>> >>> It seems like the JobManager initiates the connection with my VM and >>> cannot reach it. It could be that this is similar to the problem here: >>> >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html >>> >>> I probably have to make some changes to the networking configuration >>> of my VM so it can be reached by the JobManager despite using a >>> different >>> port each time. >>> >>> - Pieter >>> >>> 2016-02-06 14:05 GMT+01:00 Maximilian Michels : >>> Hi Pieter, Which version of Flink are you using? It appears you've created a Flink YARN cluster but you can't reach the JobManager afterwards. Cheers, Max On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete wrote: > Hi Robert, > > unfortunately there are no signs of what is going wrong in the logs. The > last log messages are about succesful registration of the TaskManagers. > > I'm also fairly sure it must be something in my VM that is causing this, > because when I start the yarn-session from a login node that is on the same > network as the hadoop cluster there are no problems registering with the > JobManager. I did also notice the following message in the local console: > > 12:30:27,173 WARN Remoting > - Tried to associate with unreachable remote address > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated for 5000 ms, > all messages to this address will be delivered to dead letters. Reason: > connection timed out: /145.100.41.13:41539 > > I can ping the JobManager fine from with VM. Could there be some invalid or > missing configuration on my side? > > Cheers, > > Pieter > > > 2016-02-06 12:54 GMT+01:00 Robert Metzger : >> >> Hi, >> >> did you check the logs of the JobManager itself? Maybe it'll tell us >> already whats going on. >> >> On Sat, Feb 6, 2016 at 12:14 PM, Pieter Hameete < phame...@gmail.com> >> wrote: >>> >>> Hi Guys! >>> >>> Im attempting to run Flink on YARN, but I run into an issue. Im starting >>> the Flink YAR
Re: Flink on YARN: Stuck on "Trying to register at JobManager"
Ive tried setting the yarn.application-master.port property in flink-conf.yaml to a range suggested in https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi rewalls The JobManager does not seem to be picking the property up. Am I setting this in the wrong place? Or is there another way to enforce this property? Cheers, Pieter 2016-02-07 20:04 GMT+01:00 Pieter Hameete : > I found the relevant information on the website. Ill consult with the > cluster admin tomorrow, thanks for the help :-) > > - Pieter > > 2016-02-07 19:31 GMT+01:00 Robert Metzger : > >> Hi, >> >> we had other users with a similar issue as well. There is a configuration >> value which allows you to specify a single port or a range of ports for the >> JobManager to allocate when running on YARN. >> Note that when using this with a single port, the JMs may collide. >> >> >> >> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete >> wrote: >> >>> Hi Stephan, >>> >>> surely it seems this way! I must not be the first with this issue >>> though? I'll have to contact the cluster admins to find a solution >>> together. What would be a way of make the JobManagers accessible from >>> outside the network, because the IP and port number changes every time. >>> >>> Alternatively, I can ask for ssh access to a node within the network. >>> that will surely work but it's not my preferred solution. >>> >>> - Pieter >>> >>> 2016-02-06 16:22 GMT+01:00 Stephan Ewen : >>> Yeah, sounds a lot like the client cannot connect to the JobManager port. The ports to communicate with HDFS and the YARN resource manager may be whitelisted r forwarded, so you can submit the YARN session, but then not connect to the JobManager afterwards. On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete wrote: > Hi Max! > > I'm using Flink 0.10.1 and indeed the cluster seems to be created > fine, all in the JobManager Web UI looks good. > > It seems like the JobManager initiates the connection with my VM and > cannot reach it. It could be that this is similar to the problem here: > > > http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html > > I probably have to make some changes to the networking configuration > of my VM so it can be reached by the JobManager despite using a different > port each time. > > - Pieter > > 2016-02-06 14:05 GMT+01:00 Maximilian Michels : > >> Hi Pieter, >> >> Which version of Flink are you using? It appears you've created a >> Flink YARN cluster but you can't reach the JobManager afterwards. >> >> Cheers, >> Max >> >> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete >> wrote: >> > Hi Robert, >> > >> > unfortunately there are no signs of what is going wrong in the >> logs. The >> > last log messages are about succesful registration of the >> TaskManagers. >> > >> > I'm also fairly sure it must be something in my VM that is causing >> this, >> > because when I start the yarn-session from a login node that is on >> the same >> > network as the hadoop cluster there are no problems registering >> with the >> > JobManager. I did also notice the following message in the local >> console: >> > >> > 12:30:27,173 WARN Remoting >> > - Tried to associate with unreachable remote address >> > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated for >> 5000 ms, >> > all messages to this address will be delivered to dead letters. >> Reason: >> > connection timed out: /145.100.41.13:41539 >> > >> > I can ping the JobManager fine from with VM. Could there be some >> invalid or >> > missing configuration on my side? >> > >> > Cheers, >> > >> > Pieter >> > >> > >> > 2016-02-06 12:54 GMT+01:00 Robert Metzger : >> >> >> >> Hi, >> >> >> >> did you check the logs of the JobManager itself? Maybe it'll tell >> us >> >> already whats going on. >> >> >> >> On Sat, Feb 6, 2016 at 12:14 PM, Pieter Hameete < >> phame...@gmail.com> >> >> wrote: >> >>> >> >>> Hi Guys! >> >>> >> >>> Im attempting to run Flink on YARN, but I run into an issue. Im >> starting >> >>> the Flink YARN session from an Ubuntu 14.04 VM. All goes well >> until after >> >>> the JobManager web UI is started: >> >>> >> >>> JobManager web interface address >> >>> >> http://head05.hathi.surfsara.nl:8088/proxy/application_1452780322684_10532/ >> >>> Waiting until all TaskManagers have connected >> >>> 11:09:51,557 INFO org.apache.flink.yarn.ApplicationClient >> >>> - Notification about new leader address >> >>> akka.tcp://flink@145.100.41.148:35666/user/jobmanager with >> session ID null. >> >>> No stat
Re: Flink on YARN: Stuck on "Trying to register at JobManager"
After downloading and building the 1.0-SNAPSHOT from the master branch I do run into another problem when starting a YARN cluster. The startup now infinitely loops at the following step: 17:39:12,369 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm2 17:39:34,855 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm1 Any clue what couldve gone wrong? I used all-default for building with maven. - Pieter 2016-02-08 17:07 GMT+01:00 Pieter Hameete : > Matter of RTFM eh ;-) thx and sorry for the bother. > > 2016-02-08 17:06 GMT+01:00 Robert Metzger : > >> You said earlier that you are using Flink 0.10. The feature is only >> available in 1.0-SNAPSHOT. >> >> On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete >> wrote: >> >>> Ive tried setting the yarn.application-master.port property in >>> flink-conf.yaml to a range suggested in >>> https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi >>> rewalls >>> >>> The JobManager does not seem to be picking the property up. Am I setting >>> this in the wrong place? Or is there another way to enforce this property? >>> >>> Cheers, >>> >>> Pieter >>> >>> 2016-02-07 20:04 GMT+01:00 Pieter Hameete : >>> I found the relevant information on the website. Ill consult with the cluster admin tomorrow, thanks for the help :-) - Pieter 2016-02-07 19:31 GMT+01:00 Robert Metzger : > Hi, > > we had other users with a similar issue as well. There is a > configuration value which allows you to specify a single port or a range > of > ports for the JobManager to allocate when running on YARN. > Note that when using this with a single port, the JMs may collide. > > > > On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete > wrote: > >> Hi Stephan, >> >> surely it seems this way! I must not be the first with this issue >> though? I'll have to contact the cluster admins to find a solution >> together. What would be a way of make the JobManagers accessible from >> outside the network, because the IP and port number changes every time. >> >> Alternatively, I can ask for ssh access to a node within the network. >> that will surely work but it's not my preferred solution. >> >> - Pieter >> >> 2016-02-06 16:22 GMT+01:00 Stephan Ewen : >> >>> Yeah, sounds a lot like the client cannot connect to the JobManager >>> port. >>> >>> The ports to communicate with HDFS and the YARN resource manager may >>> be whitelisted r forwarded, so you can submit the YARN session, but then >>> not connect to the JobManager afterwards. >>> >>> >>> >>> On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete >>> wrote: >>> Hi Max! I'm using Flink 0.10.1 and indeed the cluster seems to be created fine, all in the JobManager Web UI looks good. It seems like the JobManager initiates the connection with my VM and cannot reach it. It could be that this is similar to the problem here: http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html I probably have to make some changes to the networking configuration of my VM so it can be reached by the JobManager despite using a different port each time. - Pieter 2016-02-06 14:05 GMT+01:00 Maximilian Michels : > Hi Pieter, > > Which version of Flink are you using? It appears you've created a > Flink YARN cluster but you can't reach the JobManager afterwards. > > Cheers, > Max > > On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete > wrote: > > Hi Robert, > > > > unfortunately there are no signs of what is going wrong in the > logs. The > > last log messages are about succesful registration of the > TaskManagers. > > > > I'm also fairly sure it must be something in my VM that is > causing this, > > because when I start the yarn-session from a login node that is > on the same > > network as the hadoop cluster there are no problems registering > with the > > JobManager. I did also notice the following message in the local > console: > > > > 12:30:27,173 WARN Remoting > > - Tried to associate with unreachable remote address > > [akka.tcp://flink@145.100.41.13:41539]. Address is now gated > for 5000 ms, > > all messages to this address will be delivered to dead letters. > Reason: > > connection timed out: /145.100.41.13:41539 > > > > I can ping the JobManager fine from with VM
Re: OutputFormat vs SinkFunction
In my case, I have my application code that is calling addSink, for which I'm writing a test that needs to use LocalCollectionOutputFormat. Having two separate class hierarchies is not helpful, hence the adapter. Much of this code already exists in the implementation of FileSinkFunction, so the project already supports it in a limited way. On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels wrote: > Hi Nick, > > SinkFunction just implements user-defined functions on incoming > elements. OutputFormat offers more lifecycle methods. Thus it is a > more powerful interface. The OutputFormat originally comes from the > batch API, whereas the SinkFunction originates from streaming. Those > were more separate code paths in the past. Ultimately, it would make > sense to have only the OutputFormat interface but I think we have to > keep it to not break the API. > > If you need the lifecycle methods in streaming, there is > RichSinkFunction, which implements OutputFormat and SinkFunction. In > addition, it gives you access to the RuntimeContext. You can pass this > directly to the "addSink(sinkFunction)" API method. > > Cheers, > Max > > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk wrote: > > Heya, > > > > Is there a plan to consolidate these two interfaces? They appear to > provide > > identical functionality, differing only in lifecycle management. I found > > myself writing an adaptor so I can consume an OutputFormat where a > > SinkFunction is expected; there's not much to it. This seems like code > that > > Flink should ship. > > > > Maybe one interface or the other can be deprecated for 1.0 API? > > > > Thanks, > > Nick >
Re: OutputFormat vs SinkFunction
Changing the class hierarchy would break backwards-compatibility of the API. However, we could add another method to DataStream to easily use OutputFormats in streaming. How did you write your adapter? I came up with the one below. Admittedly, it is sort of a hack but works fine. By the way, you can also use the DataStream.write(OutputFormat format) method to use any OutputFormat. The code is below is just if you really only want to use DataStream.addSink(SinkFunction function). Cheers, Max import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.util.ArrayList; import java.util.Collection; public class OutputFormatAdapter extends LocalCollectionOutputFormat implements SinkFunction, RichFunction { public OutputFormatAdapter(Collection out) { super(out); } @Override public void invoke(T value) throws Exception { super.writeRecord(value); } @Override public void open(Configuration parameters) throws Exception { super.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } @Override public IterationRuntimeContext getIterationRuntimeContext() { throw new UnsupportedOperationException("This is not supported."); } /** Small test */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final DataStreamSource longDataStreamSource = env.generateSequence(0, 1000); final ArrayList longs = new ArrayList<>(); longDataStreamSource.addSink(new OutputFormatAdapter<>(longs)); env.execute(); for (long l : longs) { System.out.println(l); } } } On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk wrote: > In my case, I have my application code that is calling addSink, for which > I'm writing a test that needs to use LocalCollectionOutputFormat. Having > two separate class hierarchies is not helpful, hence the adapter. Much of > this code already exists in the implementation of FileSinkFunction, so the > project already supports it in a limited way. > > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels wrote: > >> Hi Nick, >> >> SinkFunction just implements user-defined functions on incoming >> elements. OutputFormat offers more lifecycle methods. Thus it is a >> more powerful interface. The OutputFormat originally comes from the >> batch API, whereas the SinkFunction originates from streaming. Those >> were more separate code paths in the past. Ultimately, it would make >> sense to have only the OutputFormat interface but I think we have to >> keep it to not break the API. >> >> If you need the lifecycle methods in streaming, there is >> RichSinkFunction, which implements OutputFormat and SinkFunction. In >> addition, it gives you access to the RuntimeContext. You can pass this >> directly to the "addSink(sinkFunction)" API method. >> >> Cheers, >> Max >> >> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk wrote: >> > Heya, >> > >> > Is there a plan to consolidate these two interfaces? They appear to >> provide >> > identical functionality, differing only in lifecycle management. I found >> > myself writing an adaptor so I can consume an OutputFormat where a >> > SinkFunction is expected; there's not much to it. This seems like code >> that >> > Flink should ship. >> > >> > Maybe one interface or the other can be deprecated for 1.0 API? >> > >> > Thanks, >> > Nick >> > >
Re: Flink on YARN: Stuck on "Trying to register at JobManager"
Mh, that's weird. Maybe both resource managers are marked as "standby"? Not sure what can cause this issue. Which YARN version are you using? Maybe you need to build Flink against that specific hadoop version yourself. On Mon, Feb 8, 2016 at 5:50 PM, Pieter Hameete wrote: > After downloading and building the 1.0-SNAPSHOT from the master branch I > do run into another problem when starting a YARN cluster. The startup now > infinitely loops at the following step: > > 17:39:12,369 INFO > org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing > over to rm2 > 17:39:34,855 INFO > org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing > over to rm1 > > Any clue what couldve gone wrong? I used all-default for building with > maven. > > - Pieter > > > > 2016-02-08 17:07 GMT+01:00 Pieter Hameete : > >> Matter of RTFM eh ;-) thx and sorry for the bother. >> >> 2016-02-08 17:06 GMT+01:00 Robert Metzger : >> >>> You said earlier that you are using Flink 0.10. The feature is only >>> available in 1.0-SNAPSHOT. >>> >>> On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete >>> wrote: >>> Ive tried setting the yarn.application-master.port property in flink-conf.yaml to a range suggested in https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi rewalls The JobManager does not seem to be picking the property up. Am I setting this in the wrong place? Or is there another way to enforce this property? Cheers, Pieter 2016-02-07 20:04 GMT+01:00 Pieter Hameete : > I found the relevant information on the website. Ill consult with the > cluster admin tomorrow, thanks for the help :-) > > - Pieter > > 2016-02-07 19:31 GMT+01:00 Robert Metzger : > >> Hi, >> >> we had other users with a similar issue as well. There is a >> configuration value which allows you to specify a single port or a range >> of >> ports for the JobManager to allocate when running on YARN. >> Note that when using this with a single port, the JMs may collide. >> >> >> >> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete >> wrote: >> >>> Hi Stephan, >>> >>> surely it seems this way! I must not be the first with this issue >>> though? I'll have to contact the cluster admins to find a solution >>> together. What would be a way of make the JobManagers accessible from >>> outside the network, because the IP and port number changes every time. >>> >>> Alternatively, I can ask for ssh access to a node within the >>> network. that will surely work but it's not my preferred solution. >>> >>> - Pieter >>> >>> 2016-02-06 16:22 GMT+01:00 Stephan Ewen : >>> Yeah, sounds a lot like the client cannot connect to the JobManager port. The ports to communicate with HDFS and the YARN resource manager may be whitelisted r forwarded, so you can submit the YARN session, but then not connect to the JobManager afterwards. On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete wrote: > Hi Max! > > I'm using Flink 0.10.1 and indeed the cluster seems to be created > fine, all in the JobManager Web UI looks good. > > It seems like the JobManager initiates the connection with my VM > and cannot reach it. It could be that this is similar to the problem > here: > > > http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html > > I probably have to make some changes to the networking > configuration of my VM so it can be reached by the JobManager despite > using > a different port each time. > > - Pieter > > 2016-02-06 14:05 GMT+01:00 Maximilian Michels : > >> Hi Pieter, >> >> Which version of Flink are you using? It appears you've created a >> Flink YARN cluster but you can't reach the JobManager afterwards. >> >> Cheers, >> Max >> >> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete < >> phame...@gmail.com> wrote: >> > Hi Robert, >> > >> > unfortunately there are no signs of what is going wrong in the >> logs. The >> > last log messages are about succesful registration of the >> TaskManagers. >> > >> > I'm also fairly sure it must be something in my VM that is >> causing this, >> > because when I start the yarn-session from a login node that is >> on the same >> > network as the hadoop cluster there are no problems registering >> with the >> > JobManager. I did also notice the following message in the >>>
Re: Error while reading binary file
Till, I am still having trouble getting this to work. Here's my code ( https://github.com/esaliya/flinkit) String binaryFile = "src/main/resources/sample.bin"; SerializedInputFormat sif = new SerializedInputFormat<>(); sif.setFilePath(binaryFile); DataSet ds = env.createInput(sif); System.out.println(ds.count()); I still get the same error as shown below Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead. at org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511) at org.saliya.flinkit.WordCount.main(WordCount.java:24) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann wrote: > Hi Saliya, > > in order to set the file path for the SerializedInputFormat you first > have to create it and then explicitly call setFilePath. > > final SerializedInputFormat inputFormat = new > SerializedInputFormat(); > inputFormat.setFilePath(PATH_TO_FILE); > > env.createInput(inputFormat, myTypeInfo); > > Cheers, > Till > > > On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake > wrote: > >> Hi, >> >> I was trying to read a simple binary file using SerializedInputFormat as >> suggested in a different thread, but encounters the following error. I >> tried to do what the exception suggests, but eventhough createInput() >> returns a DataSet object I couldn't find how to specify which file to read. >> >> Any help is appreciated. The file I am trying to read is a simple binary >> file with containing java short values. Is there any example on reading >> binary files available? >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: The type returned by >> the input format could not be automatically determined. Please specify the >> TypeInformation of the produced type explicitly by using the >> 'createInput(InputFormat, TypeInformation)' method instead. >> >> Thank you, >> Saliya >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org
Re: OutputFormat vs SinkFunction
On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels wrote: > Changing the class hierarchy would break backwards-compatibility of the > API. However, we could add another method to DataStream to easily use > OutputFormats in streaming. > Indeed, that's why I suggested deprecating one and moving toward a consolidated class hierarchy. It won't happen overnight, but this can be managed pretty easily with some adapter code like this and some additional overrides in the public APIs. How did you write your adapter? I came up with the one below. > Our implementations are similar. This one is working fine with my test code. https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3 On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk wrote: > >> In my case, I have my application code that is calling addSink, for which >> I'm writing a test that needs to use LocalCollectionOutputFormat. Having >> two separate class hierarchies is not helpful, hence the adapter. Much of >> this code already exists in the implementation of FileSinkFunction, so the >> project already supports it in a limited way. >> >> On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels >> wrote: >> >>> Hi Nick, >>> >>> SinkFunction just implements user-defined functions on incoming >>> elements. OutputFormat offers more lifecycle methods. Thus it is a >>> more powerful interface. The OutputFormat originally comes from the >>> batch API, whereas the SinkFunction originates from streaming. Those >>> were more separate code paths in the past. Ultimately, it would make >>> sense to have only the OutputFormat interface but I think we have to >>> keep it to not break the API. >>> >>> If you need the lifecycle methods in streaming, there is >>> RichSinkFunction, which implements OutputFormat and SinkFunction. In >>> addition, it gives you access to the RuntimeContext. You can pass this >>> directly to the "addSink(sinkFunction)" API method. >>> >>> Cheers, >>> Max >>> >>> On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk >>> wrote: >>> > Heya, >>> > >>> > Is there a plan to consolidate these two interfaces? They appear to >>> provide >>> > identical functionality, differing only in lifecycle management. I >>> found >>> > myself writing an adaptor so I can consume an OutputFormat where a >>> > SinkFunction is expected; there's not much to it. This seems like code >>> that >>> > Flink should ship. >>> > >>> > Maybe one interface or the other can be deprecated for 1.0 API? >>> > >>> > Thanks, >>> > Nick >>> >> >> >
Re: Flink on YARN: Stuck on "Trying to register at JobManager"
Solved: indeed it needed to be built for YARN 2.7.1 specifically. Cheers! 2016-02-08 19:13 GMT+01:00 Robert Metzger : > Mh, that's weird. Maybe both resource managers are marked as "standby"? > Not sure what can cause this issue. > > Which YARN version are you using? Maybe you need to build Flink against > that specific hadoop version yourself. > > On Mon, Feb 8, 2016 at 5:50 PM, Pieter Hameete wrote: > >> After downloading and building the 1.0-SNAPSHOT from the master branch I >> do run into another problem when starting a YARN cluster. The startup now >> infinitely loops at the following step: >> >> 17:39:12,369 INFO >> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing >> over to rm2 >> 17:39:34,855 INFO >> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing >> over to rm1 >> >> Any clue what couldve gone wrong? I used all-default for building with >> maven. >> >> - Pieter >> >> >> >> 2016-02-08 17:07 GMT+01:00 Pieter Hameete : >> >>> Matter of RTFM eh ;-) thx and sorry for the bother. >>> >>> 2016-02-08 17:06 GMT+01:00 Robert Metzger : >>> You said earlier that you are using Flink 0.10. The feature is only available in 1.0-SNAPSHOT. On Mon, Feb 8, 2016 at 4:53 PM, Pieter Hameete wrote: > Ive tried setting the yarn.application-master.port property in > flink-conf.yaml to a range suggested in > https://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html#running-flink-on-yarn-behind-fi > rewalls > > The JobManager does not seem to be picking the property up. Am I > setting this in the wrong place? Or is there another way to enforce this > property? > > Cheers, > > Pieter > > 2016-02-07 20:04 GMT+01:00 Pieter Hameete : > >> I found the relevant information on the website. Ill consult with the >> cluster admin tomorrow, thanks for the help :-) >> >> - Pieter >> >> 2016-02-07 19:31 GMT+01:00 Robert Metzger : >> >>> Hi, >>> >>> we had other users with a similar issue as well. There is a >>> configuration value which allows you to specify a single port or a >>> range of >>> ports for the JobManager to allocate when running on YARN. >>> Note that when using this with a single port, the JMs may collide. >>> >>> >>> >>> On Sun, Feb 7, 2016 at 7:25 PM, Pieter Hameete >>> wrote: >>> Hi Stephan, surely it seems this way! I must not be the first with this issue though? I'll have to contact the cluster admins to find a solution together. What would be a way of make the JobManagers accessible from outside the network, because the IP and port number changes every time. Alternatively, I can ask for ssh access to a node within the network. that will surely work but it's not my preferred solution. - Pieter 2016-02-06 16:22 GMT+01:00 Stephan Ewen : > Yeah, sounds a lot like the client cannot connect to the > JobManager port. > > The ports to communicate with HDFS and the YARN resource manager > may be whitelisted r forwarded, so you can submit the YARN session, > but > then not connect to the JobManager afterwards. > > > > On Sat, Feb 6, 2016 at 2:11 PM, Pieter Hameete > wrote: > >> Hi Max! >> >> I'm using Flink 0.10.1 and indeed the cluster seems to be created >> fine, all in the JobManager Web UI looks good. >> >> It seems like the JobManager initiates the connection with my VM >> and cannot reach it. It could be that this is similar to the problem >> here: >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/spark-with-docker-errors-with-akka-NAT-td7702.html >> >> I probably have to make some changes to the networking >> configuration of my VM so it can be reached by the JobManager >> despite using >> a different port each time. >> >> - Pieter >> >> 2016-02-06 14:05 GMT+01:00 Maximilian Michels : >> >>> Hi Pieter, >>> >>> Which version of Flink are you using? It appears you've created a >>> Flink YARN cluster but you can't reach the JobManager afterwards. >>> >>> Cheers, >>> Max >>> >>> On Sat, Feb 6, 2016 at 1:42 PM, Pieter Hameete < >>> phame...@gmail.com> wrote: >>> > Hi Robert, >>> > >>> > unfortunately there are no signs of what is going wrong in the >>> logs. The >>> > last log messages are about succesful registration of the >>> TaskManagers. >>> > >>> > I'm also fairly sure it must be something in my VM that is >>> causing thi
Kafka partition alignment for event time
My Flink job is doing aggregations on top of event-time based windowing across Kafka partitions. As I have been developing and restarting it, the state for the catch-up periods becomes unreliable -- lots of duplicate emits for time windows already seen before, that I have to discard since my sink can't handle it. There may be a bug in my job, but I wanted to clarify whether this might be a flaw in Flink's handling of this. I understand there is m:n mapping of partitions to sources depending on the parallelism. Each source will have its own watermark. During catchup, watermark progression can become pretty fragile, e.g. in my case where there's n partitions and parallelism is 1. I feel like some kind of event time alignment is needed across partitions. I may be completely off here, so I look forward to your thoughts on this! -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Error while reading binary file
Hi, please try to replace DataSet ds = env.createInput(sif); by DataSet ds = env.createInput(sif, ValueTypeInfo.SHORT_VALUE_TYPE_INFO); Best, Fabian 2016-02-08 19:33 GMT+01:00 Saliya Ekanayake : > Till, > > I am still having trouble getting this to work. Here's my code ( > https://github.com/esaliya/flinkit) > > String binaryFile = "src/main/resources/sample.bin"; > SerializedInputFormat sif = new SerializedInputFormat<>(); > sif.setFilePath(binaryFile); > DataSet ds = env.createInput(sif); > System.out.println(ds.count()); > > > I still get the same error as shown below > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The type returned by > the input format could not be automatically determined. Please specify the > TypeInformation of the produced type explicitly by using the > 'createInput(InputFormat, TypeInformation)' method instead. > at > org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511) > at org.saliya.flinkit.WordCount.main(WordCount.java:24) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > > > On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann > wrote: > >> Hi Saliya, >> >> in order to set the file path for the SerializedInputFormat you first >> have to create it and then explicitly call setFilePath. >> >> final SerializedInputFormat inputFormat = new >> SerializedInputFormat(); >> inputFormat.setFilePath(PATH_TO_FILE); >> >> env.createInput(inputFormat, myTypeInfo); >> >> Cheers, >> Till >> >> >> On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake >> wrote: >> >>> Hi, >>> >>> I was trying to read a simple binary file using SerializedInputFormat as >>> suggested in a different thread, but encounters the following error. I >>> tried to do what the exception suggests, but eventhough createInput() >>> returns a DataSet object I couldn't find how to specify which file to read. >>> >>> Any help is appreciated. The file I am trying to read is a simple binary >>> file with containing java short values. Is there any example on reading >>> binary files available? >>> >>> Exception in thread "main" >>> org.apache.flink.api.common.InvalidProgramException: The type returned by >>> the input format could not be automatically determined. Please specify the >>> TypeInformation of the produced type explicitly by using the >>> 'createInput(InputFormat, TypeInformation)' method instead. >>> >>> Thank you, >>> Saliya >>> >>> >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http://saliya.org >>> >> >> > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org >
Re: OutputFormat vs SinkFunction
Hi, one problem that I see with OutputFormat is that they are not made for a streaming world. By this, I mean that they don’t handle failure well and don’t consider fault-torelant streaming, i.e. exactly once semantics. For example, what would be expected to happen if a job with a FileOutputFormat fails and needs to recover. Now, there might be some garbage left in the files that would get emitted again after restoring to a checkpoint, thus leading to duplicate results. Having OutputFormats in a Streaming programs can work well in toy examples and tests but can be dangerous in real-world jobs. I once talked with Robert about this and we came up with the idea (I think it was mostly him) of generalizing the RollingFileSink (which is fault-tolerance aware) so that it can easily be used with something akin to OutputFormats. What do you think? -Aljoscha > On 08 Feb 2016, at 19:40, Nick Dimiduk wrote: > > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels wrote: > Changing the class hierarchy would break backwards-compatibility of the API. > However, we could add another method to DataStream to easily use > OutputFormats in streaming. > > Indeed, that's why I suggested deprecating one and moving toward a > consolidated class hierarchy. It won't happen overnight, but this can be > managed pretty easily with some adapter code like this and some additional > overrides in the public APIs. > > How did you write your adapter? I came up with the one below. > > Our implementations are similar. This one is working fine with my test code. > > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3 > > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk wrote: > In my case, I have my application code that is calling addSink, for which I'm > writing a test that needs to use LocalCollectionOutputFormat. Having two > separate class hierarchies is not helpful, hence the adapter. Much of this > code already exists in the implementation of FileSinkFunction, so the project > already supports it in a limited way. > > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels wrote: > Hi Nick, > > SinkFunction just implements user-defined functions on incoming > elements. OutputFormat offers more lifecycle methods. Thus it is a > more powerful interface. The OutputFormat originally comes from the > batch API, whereas the SinkFunction originates from streaming. Those > were more separate code paths in the past. Ultimately, it would make > sense to have only the OutputFormat interface but I think we have to > keep it to not break the API. > > If you need the lifecycle methods in streaming, there is > RichSinkFunction, which implements OutputFormat and SinkFunction. In > addition, it gives you access to the RuntimeContext. You can pass this > directly to the "addSink(sinkFunction)" API method. > > Cheers, > Max > > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk wrote: > > Heya, > > > > Is there a plan to consolidate these two interfaces? They appear to provide > > identical functionality, differing only in lifecycle management. I found > > myself writing an adaptor so I can consume an OutputFormat where a > > SinkFunction is expected; there's not much to it. This seems like code that > > Flink should ship. > > > > Maybe one interface or the other can be deprecated for 1.0 API? > > > > Thanks, > > Nick > > >
Re: Error while reading binary file
Thank you, Fabian. It solved the compilation error, but at runtime I get an end-of-file exception. I've put up a sample code with data at Github https://github.com/esaliya/flinkit. The data file is a binary file containing 64 Short values. 02/08/2016 16:01:19 CHAIN DataSource (at main(WordCount.java:25) (org.apache.flink.api.common.io.SerializedInputFormat)) -> FlatMap (count())(4/8) switched to FAILED java.io.EOFException at java.io.DataInputStream.readShort(DataInputStream.java:315) at org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readShort(InputViewDataInputStreamWrapper.java:92) at org.apache.flink.types.ShortValue.read(ShortValue.java:88) at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:37) at org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:31) at org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) On Mon, Feb 8, 2016 at 3:50 PM, Fabian Hueske wrote: > Hi, > > please try to replace > DataSet ds = env.createInput(sif); > by > DataSet ds = env.createInput(sif, > ValueTypeInfo.SHORT_VALUE_TYPE_INFO); > > Best, Fabian > > 2016-02-08 19:33 GMT+01:00 Saliya Ekanayake : > >> Till, >> >> I am still having trouble getting this to work. Here's my code ( >> https://github.com/esaliya/flinkit) >> >> String binaryFile = "src/main/resources/sample.bin"; >> SerializedInputFormat sif = new SerializedInputFormat<>(); >> sif.setFilePath(binaryFile); >> DataSet ds = env.createInput(sif); >> System.out.println(ds.count()); >> >> >> I still get the same error as shown below >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: The type returned by >> the input format could not be automatically determined. Please specify the >> TypeInformation of the produced type explicitly by using the >> 'createInput(InputFormat, TypeInformation)' method instead. >> at >> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511) >> at org.saliya.flinkit.WordCount.main(WordCount.java:24) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> >> >> On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann >> wrote: >> >>> Hi Saliya, >>> >>> in order to set the file path for the SerializedInputFormat you first >>> have to create it and then explicitly call setFilePath. >>> >>> final SerializedInputFormat inputFormat = new >>> SerializedInputFormat(); >>> inputFormat.setFilePath(PATH_TO_FILE); >>> >>> env.createInput(inputFormat, myTypeInfo); >>> >>> Cheers, >>> Till >>> >>> >>> On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake >>> wrote: >>> Hi, I was trying to read a simple binary file using SerializedInputFormat as suggested in a different thread, but encounters the following error. I tried to do what the exception suggests, but eventhough createInput() returns a DataSet object I couldn't find how to specify which file to read. Any help is appreciated. The file I am trying to read is a simple binary file with containing java short values. Is there any example on reading binary files available? Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead. Thank you, Saliya -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org >>> >>> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org
Re: Kafka partition alignment for event time
Things make more sense after coming across http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tbasy...@mail.gmail.com%3E I need to ensure the parallelism is at least the number of partitions. This seems like a gotcha that could be better documented or automatically enforced. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4786.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Kafka partition alignment for event time
Hi, what do you mean by this? I think it should also work when setting parallelism to 1. If not, then there is either a problem with Flink or maybe something in the Data. -Aljoscha > On 08 Feb 2016, at 21:43, shikhar wrote: > > Things make more sense after coming across > http://mail-archives.apache.org/mod_mbox/flink-user/201512.mbox/%3CCANC1h_vVUT3BkFFck5wJA2ju_sSenxmE=Fiizr=ds6tbasy...@mail.gmail.com%3E > > I need to ensure the parallelism is at least the number of partitions. This > seems like a gotcha that could be better documented or automatically > enforced. > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4786.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
Re: Kafka partition alignment for event time
Stephan explained in that thread that we're picking the min watermark when doing operations that join streams from multiple sources. If we have m:n partition-source assignment where m>n, the source is going to end up with the max watermark. Having m<=n ensures that the lowest watermark is used. Re: automatic enforcement, perhaps allowing for more than 1 Kafka partition on a source should require opt-in, e.g. allowOversubscription() -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-partition-alignment-for-event-time-tp4782p4788.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Error while reading binary file
The SerializedInputFormat extends the BinaryInputFormat which expects a special block-wise encoding and certain metadata fields. It is not suited to read arbitrary binary files such as a file with 64 short values. I suggest to implement a custom input format based on FileInputFormat. Best, Fabian 2016-02-08 22:05 GMT+01:00 Saliya Ekanayake : > Thank you, Fabian. It solved the compilation error, but at runtime I get > an end-of-file exception. I've put up a sample code with data at Github > https://github.com/esaliya/flinkit. The data file is a binary file > containing 64 Short values. > > > 02/08/2016 16:01:19 CHAIN DataSource (at main(WordCount.java:25) > (org.apache.flink.api.common.io.SerializedInputFormat)) -> FlatMap > (count())(4/8) switched to FAILED > java.io.EOFException > at java.io.DataInputStream.readShort(DataInputStream.java:315) > at > org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readShort(InputViewDataInputStreamWrapper.java:92) > at org.apache.flink.types.ShortValue.read(ShortValue.java:88) > at > org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:37) > at > org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:31) > at > org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > On Mon, Feb 8, 2016 at 3:50 PM, Fabian Hueske wrote: > >> Hi, >> >> please try to replace >> DataSet ds = env.createInput(sif); >> by >> DataSet ds = env.createInput(sif, >> ValueTypeInfo.SHORT_VALUE_TYPE_INFO); >> >> Best, Fabian >> >> 2016-02-08 19:33 GMT+01:00 Saliya Ekanayake : >> >>> Till, >>> >>> I am still having trouble getting this to work. Here's my code ( >>> https://github.com/esaliya/flinkit) >>> >>> String binaryFile = "src/main/resources/sample.bin"; >>> SerializedInputFormat sif = new SerializedInputFormat<>(); >>> sif.setFilePath(binaryFile); >>> DataSet ds = env.createInput(sif); >>> System.out.println(ds.count()); >>> >>> >>> I still get the same error as shown below >>> >>> Exception in thread "main" >>> org.apache.flink.api.common.InvalidProgramException: The type returned by >>> the input format could not be automatically determined. Please specify the >>> TypeInformation of the produced type explicitly by using the >>> 'createInput(InputFormat, TypeInformation)' method instead. >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511) >>> at org.saliya.flinkit.WordCount.main(WordCount.java:24) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:497) >>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >>> >>> >>> On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann >>> wrote: >>> Hi Saliya, in order to set the file path for the SerializedInputFormat you first have to create it and then explicitly call setFilePath. final SerializedInputFormat inputFormat = new SerializedInputFormat(); inputFormat.setFilePath(PATH_TO_FILE); env.createInput(inputFormat, myTypeInfo); Cheers, Till On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake wrote: > Hi, > > I was trying to read a simple binary file using SerializedInputFormat > as suggested in a different thread, but encounters the following error. I > tried to do what the exception suggests, but eventhough createInput() > returns a DataSet object I couldn't find how to specify which file to > read. > > Any help is appreciated. The file I am trying to read is a simple > binary file with containing java short values. Is there any example on > reading binary files available? > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The type returned by > the input format could not be automatically determined. Please specify the > TypeInformation of the produced type explicitly by using the > 'createInput(InputFormat, TypeInformation)' method instead. > > Thank you, > Saliya > > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org > >>> >>> >>> -- >>> Saliya Ekanayake >>> Ph.D. Candidate | Research Assistant >>> School of Informatics and Computing | Digital Science Center >>> Indiana University, Bloomington >>> Cell 812-391-4914 >>> http:/
Re: Error while reading binary file
Thank you, Fabian. I'll try to do it. On Mon, Feb 8, 2016 at 4:37 PM, Fabian Hueske wrote: > The SerializedInputFormat extends the BinaryInputFormat which expects a > special block-wise encoding and certain metadata fields. > It is not suited to read arbitrary binary files such as a file with 64 > short values. > I suggest to implement a custom input format based on FileInputFormat. > > Best, Fabian > > 2016-02-08 22:05 GMT+01:00 Saliya Ekanayake : > >> Thank you, Fabian. It solved the compilation error, but at runtime I get >> an end-of-file exception. I've put up a sample code with data at Github >> https://github.com/esaliya/flinkit. The data file is a binary file >> containing 64 Short values. >> >> >> 02/08/2016 16:01:19 CHAIN DataSource (at main(WordCount.java:25) >> (org.apache.flink.api.common.io.SerializedInputFormat)) -> FlatMap >> (count())(4/8) switched to FAILED >> java.io.EOFException >> at java.io.DataInputStream.readShort(DataInputStream.java:315) >> at >> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readShort(InputViewDataInputStreamWrapper.java:92) >> at org.apache.flink.types.ShortValue.read(ShortValue.java:88) >> at >> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:37) >> at >> org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:31) >> at >> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:169) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> >> On Mon, Feb 8, 2016 at 3:50 PM, Fabian Hueske wrote: >> >>> Hi, >>> >>> please try to replace >>> DataSet ds = env.createInput(sif); >>> by >>> DataSet ds = env.createInput(sif, >>> ValueTypeInfo.SHORT_VALUE_TYPE_INFO); >>> >>> Best, Fabian >>> >>> 2016-02-08 19:33 GMT+01:00 Saliya Ekanayake : >>> Till, I am still having trouble getting this to work. Here's my code ( https://github.com/esaliya/flinkit) String binaryFile = "src/main/resources/sample.bin"; SerializedInputFormat sif = new SerializedInputFormat<>(); sif.setFilePath(binaryFile); DataSet ds = env.createInput(sif); System.out.println(ds.count()); I still get the same error as shown below Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead. at org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:511) at org.saliya.flinkit.WordCount.main(WordCount.java:24) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) On Mon, Feb 8, 2016 at 5:42 AM, Till Rohrmann wrote: > Hi Saliya, > > in order to set the file path for the SerializedInputFormat you first > have to create it and then explicitly call setFilePath. > > final SerializedInputFormat inputFormat = new > SerializedInputFormat(); > inputFormat.setFilePath(PATH_TO_FILE); > > env.createInput(inputFormat, myTypeInfo); > > Cheers, > Till > > > On Mon, Feb 8, 2016 at 7:00 AM, Saliya Ekanayake > wrote: > >> Hi, >> >> I was trying to read a simple binary file using SerializedInputFormat >> as suggested in a different thread, but encounters the following error. I >> tried to do what the exception suggests, but eventhough createInput() >> returns a DataSet object I couldn't find how to specify which file to >> read. >> >> Any help is appreciated. The file I am trying to read is a simple >> binary file with containing java short values. Is there any example on >> reading binary files available? >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: The type returned by >> the input format could not be automatically determined. Please specify >> the >> TypeInformation of the produced type explicitly by using the >> 'createInput(InputFormat, TypeInformation)' method instead. >> >> Thank you, >> Saliya >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http:
Re: OutputFormat vs SinkFunction
I think this depends on the implementation of the OutputFormat. For instance, an HBase, Cassandra or ES OF will handle most operations as idempotent when the scheme is designed appropriately. You are (rightly) focusing on FileOF's, which also depend on the semantics of their implementation. MR always required an atomic rename of the DFS, and only moved output files into place once the task commits its output. Also I think it unreasonable to bring exactly once considerations into the discussion because nothing provides this right now without a multi-stage commit protocol. Such a protocol would be provided at the framework level and to the best of my knowledge it's semantic expectations on the output handler are undefined. My original question comes from wanting to use the LocalCollectionOF to test a streaming flow that sinks to Kafka, without rewriting the flow in test code. So in this case you're right that it does apply to tests. I don't think correctness of tests is a trivial concern though. As for RollingFileSink, I've not seen this conversation so I cannot comment. Per my earlier examples, I think it's not correct to assume all OF implementations are file-based. On Monday, February 8, 2016, Aljoscha Krettek wrote: > Hi, > one problem that I see with OutputFormat is that they are not made for a > streaming world. By this, I mean that they don’t handle failure well and > don’t consider fault-torelant streaming, i.e. exactly once semantics. For > example, what would be expected to happen if a job with a FileOutputFormat > fails and needs to recover. Now, there might be some garbage left in the > files that would get emitted again after restoring to a checkpoint, thus > leading to duplicate results. > > Having OutputFormats in a Streaming programs can work well in toy examples > and tests but can be dangerous in real-world jobs. I once talked with > Robert about this and we came up with the idea (I think it was mostly him) > of generalizing the RollingFileSink (which is fault-tolerance aware) so > that it can easily be used with something akin to OutputFormats. > > What do you think? > > -Aljoscha > > On 08 Feb 2016, at 19:40, Nick Dimiduk > wrote: > > > > On Mon, Feb 8, 2016 at 9:51 AM, Maximilian Michels > wrote: > > Changing the class hierarchy would break backwards-compatibility of the > API. However, we could add another method to DataStream to easily use > OutputFormats in streaming. > > > > Indeed, that's why I suggested deprecating one and moving toward a > consolidated class hierarchy. It won't happen overnight, but this can be > managed pretty easily with some adapter code like this and some additional > overrides in the public APIs. > > > > How did you write your adapter? I came up with the one below. > > > > Our implementations are similar. This one is working fine with my test > code. > > > > https://gist.github.com/ndimiduk/18820fcd78412c6b4fc3 > > > > On Mon, Feb 8, 2016 at 6:07 PM, Nick Dimiduk > wrote: > > In my case, I have my application code that is calling addSink, for > which I'm writing a test that needs to use LocalCollectionOutputFormat. > Having two separate class hierarchies is not helpful, hence the adapter. > Much of this code already exists in the implementation of FileSinkFunction, > so the project already supports it in a limited way. > > > > On Mon, Feb 8, 2016 at 4:16 AM, Maximilian Michels > wrote: > > Hi Nick, > > > > SinkFunction just implements user-defined functions on incoming > > elements. OutputFormat offers more lifecycle methods. Thus it is a > > more powerful interface. The OutputFormat originally comes from the > > batch API, whereas the SinkFunction originates from streaming. Those > > were more separate code paths in the past. Ultimately, it would make > > sense to have only the OutputFormat interface but I think we have to > > keep it to not break the API. > > > > If you need the lifecycle methods in streaming, there is > > RichSinkFunction, which implements OutputFormat and SinkFunction. In > > addition, it gives you access to the RuntimeContext. You can pass this > > directly to the "addSink(sinkFunction)" API method. > > > > Cheers, > > Max > > > > On Mon, Feb 8, 2016 at 7:14 AM, Nick Dimiduk > wrote: > > > Heya, > > > > > > Is there a plan to consolidate these two interfaces? They appear to > provide > > > identical functionality, differing only in lifecycle management. I > found > > > myself writing an adaptor so I can consume an OutputFormat where a > > > SinkFunction is expected; there's not much to it. This seems like code > that > > > Flink should ship. > > > > > > Maybe one interface or the other can be deprecated for 1.0 API? > > > > > > Thanks, > > > Nick > > > > > > > >