Hi Naveen, Turns out I had changed the pom.xml after I checked out your code while trying to get your example working. I have found the real issue of your problem. Please make sure you have the following dependency in your pom.xml (in addition to the storm modules).
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.0-SNAPSHOT</version> </dependency> The quickstart also contains this. It shouldn't be necessary but it's a workaround for a bug which we just discovered with your help. Thank you for reporting! Best regards, Max On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <m...@apache.org> wrote: > Hi Naveen, > > In your code on GitHub, please remove the following from the WordCount > file: > > > OutputStream o; > try { > o = new FileOutputStream("/tmp/wordcount1.txt", true); > o.write((word + " " + count.toString() + "\n").getBytes()); > o.close(); > } catch (IOException e) { > e.printStackTrace(); > } > > > It is not necessary because you already have a bolt which prints to a > file. What this code did, is overwriting the wordcount1.txt file on every > incoming tuple. > > You were not seeing console output because you didn't set up a > log4j.properties file. Put the following in a file called log4j.properties > in a folder "resources" under src/main/resources: > > log4j.rootLogger=INFO, console > > log4j.appender.console=org.apache.log4j.ConsoleAppender > log4j.appender.console.layout=org.apache.log4j.PatternLayout > log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c > %x - %m%n > > Then you will also see console output. We will fix the submission code of > Storm such that this won't be necessary in the future. By the way, the > recommended template for Flink Jobs on Storm is to start off with the Flink > Quickstart project: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html > This would already contain the log4.properties file. > > Best, > Max > > > On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen < > naveen.madh...@capitalone.com> wrote: > >> Hi Matthias, Sorry for the confusion. I just used a simple code in the >> Count Bolt to write the bolt output into a file and was not using >> BiltFileSink. >> >> OutputStream o; >> try { >> o = new FileOutputStream("/tmp/wordcount.txt", true); >> o.write((word + " " + count.toString() + "\n").getBytes()); >> o.close(); >> } catch (IOException e) { >> e.printStackTrace(); >> } >> >> >> >> >> Coming to BoltFileSink, I tried using cluster.shutdown at the end which >> stops the local cluster but getting the below exception, >> >> java.lang.Exception: TaskManager is shutting down. >> at >> >> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala >> :216) >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) >> at >> >> org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager >> .scala:119) >> at >> >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi >> nishTerminate(FaultHandling.scala:210) >> at >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) >> at akka.actor.ActorCell.terminate(ActorCell.scala:369) >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >> at >> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: >> 1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav >> a:107) >> >> >> >> I added the below lines of code for stoping the local cluster at the end, >> the code is same as flink-storm-examples one. >> >> Utils.sleep(10 * 1000); >> >> cluster.shutdown(); >> >> >> >> >> Thanks, >> Naveen >> >> >> >> >> On 12/5/15, 7:54 AM, "Matthias J. Sax" <mj...@apache.org> wrote: >> >> >Hi Naveen, >> > >> >in you previous mail you mention that >> > >> >> Yeah, I did route the ³count² bolt output to a file and I see the >> >>output. >> >> I can see the Storm and Flink output matching. >> > >> >How did you do this? Modifying the "count bolt" code? Or did you use >> >some other bolt that consumes the "count bolt" output? >> > >> >One more thought: how much data do you have and did you terminate you >> >program before looking into the result file? I am asking because >> >BoltFileSink uses a BufferedOutputWriter internally -- if you have only >> >a few records in your result and do not terminate, the data might still >> >be buffered. I would get flushed to disc if you terminate the program. >> > >> >Otherwise, I could not spot any issue with your code. And as Max >> >mentioned that the console output worked for him using you program I am >> >little puzzled what might go wrong in your setup. The program seems to >> >be correct. >> > >> > >> >-Matthias >> > >> > >> >On 12/04/2015 08:55 PM, Madhire, Naveen wrote: >> >> Hi Max, >> >> >> >> I forgot to include flink-storm-examples dependency in the application >> >>to >> >> use BoltFileSink. >> >> >> >> However, the file created by the BoltFileSink is empty. Is there any >> >>other >> >> stuff which I need to do to write it into a file by using BoltFileSink? >> >> >> >> I am using the same code what you mentioned, >> >> >> >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new >> >> OutputFormatter() { >> >> @Override >> >> public String format(Tuple tuple) { >> >> return tuple.toString(); >> >> } >> >> }), 1).shuffleGrouping("count"); >> >> >> >> >> >> >> >> >> >> Thanks, >> >> Naveen >> >> >> >> >> >> >> >> >> >>> >> >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <m...@apache.org> wrote: >> >>> >> >>>> Hi Naveen, >> >>>> >> >>>> Were you using Maven before? The syncing of changes in the master >> >>>> always takes a while for Maven. The documentation happened to be >> >>>> updated before Maven synchronized. Building and installing manually >> >>>> (what you did) solves the problem. >> >>>> >> >>>> Strangely, when I run your code on my machine with the latest >> >>>> 1.0-SNAPSHOT I see a lot of output on my console. >> >>>> >> >>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89 >> >>>> >> >>>> Could you add bolt which writes the Storm tuples to a file? Is that >> >>>> file also empty? >> >>>> >> >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new >> >>>> OutputFormatter() { >> >>>> @Override >> >>>> public String format(Tuple tuple) { >> >>>> return tuple.toString(); >> >>>> } >> >>>> }), 1).shuffleGrouping("count"); >> >>>> >> >>>> >> >>>> Thanks, >> >>>> Max >> >>> >> >>> ________________________________________________________ >> >>> >> >>> The information contained in this e-mail is confidential and/or >> >>> proprietary to Capital One and/or its affiliates and may only be used >> >>> solely in performance of work or services for Capital One. The >> >>> information transmitted herewith is intended only for use by the >> >>> individual or entity to which it is addressed. If the reader of this >> >>> message is not the intended recipient, you are hereby notified that >> any >> >>> review, retransmission, dissemination, distribution, copying or other >> >>>use >> >>> of, or taking of any action in reliance upon this information is >> >>>strictly >> >>> prohibited. If you have received this communication in error, please >> >>> contact the sender and delete the material from your computer. >> >>> >> >> >> >> ________________________________________________________ >> >> >> >> The information contained in this e-mail is confidential and/or >> >>proprietary to Capital One and/or its affiliates and may only be used >> >>solely in performance of work or services for Capital One. The >> >>information transmitted herewith is intended only for use by the >> >>individual or entity to which it is addressed. If the reader of this >> >>message is not the intended recipient, you are hereby notified that any >> >>review, retransmission, dissemination, distribution, copying or other >> >>use of, or taking of any action in reliance upon this information is >> >>strictly prohibited. If you have received this communication in error, >> >>please contact the sender and delete the material from your computer. >> >> >> > >> >> ________________________________________________________ >> >> The information contained in this e-mail is confidential and/or >> proprietary to Capital One and/or its affiliates and may only be used >> solely in performance of work or services for Capital One. The information >> transmitted herewith is intended only for use by the individual or entity >> to which it is addressed. If the reader of this message is not the intended >> recipient, you are hereby notified that any review, retransmission, >> dissemination, distribution, copying or other use of, or taking of any >> action in reliance upon this information is strictly prohibited. If you >> have received this communication in error, please contact the sender and >> delete the material from your computer. >> > >