Hi Naveen, just for completeness: Max fixed this bug today and we also updated the documentation.
As you are using SNAPSHOT version, you do not need to include "flink-java" any more if you update to the latest version containing the fix. Furthermore, *do not* include "storm-core" as an dependency -- this will result in a Kryo problem due to a Flink/Storm Kryo version conflict. (The dependency is not needed anyway, as you get it automatically via "flink-storm-examples" or "flink-storm".) This Kryo version conflict was the problem in the first place. It resulted in a Kryo exception when running your program longer that 10 seconds. As you stopped after 10 seconds, you did not see the exception and just an empty result file :/ -Matthias On 12/08/2015 05:22 PM, Maximilian Michels wrote: > Hi Naveen, > > Turns out I had changed the pom.xml after I checked out your code while > trying to get your example working. I have found the real issue of your > problem. Please make sure you have the following dependency in your > pom.xml (in addition to the storm modules). > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>1.0-SNAPSHOT</version> > </dependency> > > The quickstart also contains this. It shouldn't be necessary but it's a > workaround for a bug which we just discovered with your help. Thank you > for reporting! > > Best regards, > Max > > On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > Hi Naveen, > > In your code on GitHub, please remove the following from the > WordCount file: > > > OutputStream o; > try { > o = new FileOutputStream("/tmp/wordcount1.txt", true); > o.write((word + " " + count.toString() + "\n").getBytes()); > o.close(); > } catch (IOException e) { > e.printStackTrace(); > } > > > It is not necessary because you already have a bolt which prints to > a file. What this code did, is overwriting the wordcount1.txt file > on every incoming tuple. > > You were not seeing console output because you didn't set up a > log4j.properties file. Put the following in a file called > log4j.properties in a folder "resources" under src/main/resources: > > log4j.rootLogger=INFO, console > > log4j.appender.console=org.apache.log4j.ConsoleAppender > log4j.appender.console.layout=org.apache.log4j.PatternLayout > log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p > %-60c %x - %m%n > > Then you will also see console output. We will fix the submission > code of Storm such that this won't be necessary in the future. By > the way, the recommended template for Flink Jobs on Storm is to > start off with the Flink Quickstart project: > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html > This would already contain the log4.properties file. > > Best, > Max > > > On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen > <naveen.madh...@capitalone.com > <mailto:naveen.madh...@capitalone.com>> wrote: > > Hi Matthias, Sorry for the confusion. I just used a simple code > in the > Count Bolt to write the bolt output into a file and was not using > BiltFileSink. > > OutputStream o; > try { > o = new FileOutputStream("/tmp/wordcount.txt", true); > o.write((word + " " + count.toString() + "\n").getBytes()); > o.close(); > } catch (IOException e) { > e.printStackTrace(); > } > > > > > Coming to BoltFileSink, I tried using cluster.shutdown at the > end which > stops the local cluster but getting the below exception, > > java.lang.Exception: TaskManager is shutting down. > at > > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala > :216) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at > > org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager > .scala:119) > at > > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi > nishTerminate(FaultHandling.scala:210) > at > > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: > 1339) > at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav > a:107) > > > > I added the below lines of code for stoping the local cluster at > the end, > the code is same as flink-storm-examples one. > > Utils.sleep(10 * 1000); > > cluster.shutdown(); > > > > > Thanks, > Naveen > > > > > On 12/5/15, 7:54 AM, "Matthias J. Sax" <mj...@apache.org > <mailto:mj...@apache.org>> wrote: > > >Hi Naveen, > > > >in you previous mail you mention that > > > >> Yeah, I did route the ³count² bolt output to a file and I see the > >>output. > >> I can see the Storm and Flink output matching. > > > >How did you do this? Modifying the "count bolt" code? Or did > you use > >some other bolt that consumes the "count bolt" output? > > > >One more thought: how much data do you have and did you > terminate you > >program before looking into the result file? I am asking because > >BoltFileSink uses a BufferedOutputWriter internally -- if you > have only > >a few records in your result and do not terminate, the data > might still > >be buffered. I would get flushed to disc if you terminate the > program. > > > >Otherwise, I could not spot any issue with your code. And as Max > >mentioned that the console output worked for him using you > program I am > >little puzzled what might go wrong in your setup. The program > seems to > >be correct. > > > > > >-Matthias > > > > > >On 12/04/2015 08:55 PM, Madhire, Naveen wrote: > >> Hi Max, > >> > >> I forgot to include flink-storm-examples dependency in the > application > >>to > >> use BoltFileSink. > >> > >> However, the file created by the BoltFileSink is empty. Is > there any > >>other > >> stuff which I need to do to write it into a file by using > BoltFileSink? > >> > >> I am using the same code what you mentioned, > >> > >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new > >> OutputFormatter() { > >> @Override > >> public String format(Tuple tuple) { > >> return tuple.toString(); > >> } > >> }), 1).shuffleGrouping("count"); > >> > >> > >> > >> > >> Thanks, > >> Naveen > >> > >> > >> > >> > >>> > >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <m...@apache.org > <mailto:m...@apache.org>> wrote: > >>> > >>>> Hi Naveen, > >>>> > >>>> Were you using Maven before? The syncing of changes in the > master > >>>> always takes a while for Maven. The documentation happened > to be > >>>> updated before Maven synchronized. Building and installing > manually > >>>> (what you did) solves the problem. > >>>> > >>>> Strangely, when I run your code on my machine with the latest > >>>> 1.0-SNAPSHOT I see a lot of output on my console. > >>>> > >>>> Here's the output: > https://gist.github.com/mxm/98cd927866b193ce0f89 > >>>> > >>>> Could you add bolt which writes the Storm tuples to a file? > Is that > >>>> file also empty? > >>>> > >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new > >>>> OutputFormatter() { > >>>> @Override > >>>> public String format(Tuple tuple) { > >>>> return tuple.toString(); > >>>> } > >>>> }), 1).shuffleGrouping("count"); > >>>> > >>>> > >>>> Thanks, > >>>> Max > >>> > >>> ________________________________________________________ > >>> > >>> The information contained in this e-mail is confidential and/or > >>> proprietary to Capital One and/or its affiliates and may > only be used > >>> solely in performance of work or services for Capital One. The > >>> information transmitted herewith is intended only for use by the > >>> individual or entity to which it is addressed. If the reader > of this > >>> message is not the intended recipient, you are hereby > notified that any > >>> review, retransmission, dissemination, distribution, copying > or other > >>>use > >>> of, or taking of any action in reliance upon this information is > >>>strictly > >>> prohibited. If you have received this communication in > error, please > >>> contact the sender and delete the material from your computer. > >>> > >> > >> ________________________________________________________ > >> > >> The information contained in this e-mail is confidential and/or > >>proprietary to Capital One and/or its affiliates and may only > be used > >>solely in performance of work or services for Capital One. The > >>information transmitted herewith is intended only for use by the > >>individual or entity to which it is addressed. If the reader > of this > >>message is not the intended recipient, you are hereby notified > that any > >>review, retransmission, dissemination, distribution, copying > or other > >>use of, or taking of any action in reliance upon this > information is > >>strictly prohibited. If you have received this communication > in error, > >>please contact the sender and delete the material from your > computer. > >> > > > > ________________________________________________________ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be > used solely in performance of work or services for Capital One. > The information transmitted herewith is intended only for use by > the individual or entity to which it is addressed. If the reader > of this message is not the intended recipient, you are hereby > notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action > in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the > sender and delete the material from your computer. > > >
signature.asc
Description: OpenPGP digital signature