Hi Naveen,

Turns out I had changed the pom.xml after I checked out your code while
trying to get your example working. I have found the real issue of your
problem. Please make sure you have the following dependency in your pom.xml
(in addition to the storm modules).

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

The quickstart also contains this. It shouldn't be necessary but it's a
workaround for a bug which we just discovered with your help. Thank you for
reporting!

Best regards,
Max

On Tue, Dec 8, 2015 at 2:56 PM, Maximilian Michels <m...@apache.org> wrote:

> Hi Naveen,
>
> In your code on GitHub, please remove the following from the WordCount
> file:
>
>
> OutputStream o;
> try {
>     o = new FileOutputStream("/tmp/wordcount1.txt", true);
>     o.write((word + " " + count.toString() + "\n").getBytes());
>     o.close();
> } catch (IOException e) {
>     e.printStackTrace();
> }
>
>
> It is not necessary because you already have a bolt which prints to a
> file. What this code did, is overwriting the wordcount1.txt file on every
> incoming tuple.
>
> You were not seeing console output because you didn't set up a
> log4j.properties file. Put the following in a file called log4j.properties
> in a folder "resources" under src/main/resources:
>
> log4j.rootLogger=INFO, console
>
> log4j.appender.console=org.apache.log4j.ConsoleAppender
> log4j.appender.console.layout=org.apache.log4j.PatternLayout
> log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c 
> %x - %m%n
>
> Then you will also see console output. We will fix the submission code of
> Storm such that this won't be necessary in the future. By the way, the
> recommended template for Flink Jobs on Storm is to start off with the Flink
> Quickstart project:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/quickstart/java_api_quickstart.html
> This would already contain the log4.properties file.
>
> Best,
> Max
>
>
> On Mon, Dec 7, 2015 at 11:05 PM, Madhire, Naveen <
> naveen.madh...@capitalone.com> wrote:
>
>> Hi Matthias, Sorry for the confusion. I just used a simple code in the
>> Count Bolt to write the bolt output into a file and was not using
>> BiltFileSink.
>>
>> OutputStream o;
>> try {
>>     o = new FileOutputStream("/tmp/wordcount.txt", true);
>>     o.write((word + " " + count.toString() + "\n").getBytes());
>>     o.close();
>> } catch (IOException e) {
>>     e.printStackTrace();
>> }
>>
>>
>>
>>
>> Coming to BoltFileSink, I tried using cluster.shutdown at the end which
>> stops the local cluster but getting the below exception,
>>
>> java.lang.Exception: TaskManager is shutting down.
>>         at
>>
>> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala
>> :216)
>>         at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>>         at
>>
>> org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager
>> .scala:119)
>>         at
>>
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$fi
>> nishTerminate(FaultHandling.scala:210)
>>         at
>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>>         at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>>         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>>         at
>> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:
>> 1339)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.jav
>> a:107)
>>
>>
>>
>> I added the below lines of code for stoping the local cluster at the end,
>> the code is same as flink-storm-examples one.
>>
>> Utils.sleep(10 * 1000);
>>
>> cluster.shutdown();
>>
>>
>>
>>
>> Thanks,
>> Naveen
>>
>>
>>
>>
>> On 12/5/15, 7:54 AM, "Matthias J. Sax" <mj...@apache.org> wrote:
>>
>> >Hi Naveen,
>> >
>> >in you previous mail you mention that
>> >
>> >> Yeah, I did route the ³count² bolt output to a file and I see the
>> >>output.
>> >> I can see the Storm and Flink output matching.
>> >
>> >How did you do this? Modifying the "count bolt" code? Or did you use
>> >some other bolt that consumes the "count bolt" output?
>> >
>> >One more thought: how much data do you have and did you terminate you
>> >program before looking into the result file? I am asking because
>> >BoltFileSink uses a BufferedOutputWriter internally -- if you have only
>> >a few records in your result and do not terminate, the data might still
>> >be buffered. I would get flushed to disc if you terminate the program.
>> >
>> >Otherwise, I could not spot any issue with your code. And as Max
>> >mentioned that the console output worked for him using you program I am
>> >little puzzled what might go wrong in your setup. The program seems to
>> >be correct.
>> >
>> >
>> >-Matthias
>> >
>> >
>> >On 12/04/2015 08:55 PM, Madhire, Naveen wrote:
>> >> Hi Max,
>> >>
>> >> I forgot to include flink-storm-examples dependency in the application
>> >>to
>> >> use BoltFileSink.
>> >>
>> >> However, the file created by the BoltFileSink is empty. Is there any
>> >>other
>> >> stuff which I need to do to write it into a file by using BoltFileSink?
>> >>
>> >> I am using the same code what you mentioned,
>> >>
>> >> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>> >> OutputFormatter() {
>> >>    @Override
>> >>    public String format(Tuple tuple) {
>> >>       return tuple.toString();
>> >>    }
>> >> }), 1).shuffleGrouping("count");
>> >>
>> >>
>> >>
>> >>
>> >> Thanks,
>> >> Naveen
>> >>
>> >>
>> >>
>> >>
>> >>>
>> >>> On 12/4/15, 5:36 AM, "Maximilian Michels" <m...@apache.org> wrote:
>> >>>
>> >>>> Hi Naveen,
>> >>>>
>> >>>> Were you using Maven before? The syncing of changes in the master
>> >>>> always takes a while for Maven. The documentation happened to be
>> >>>> updated before Maven synchronized. Building and installing manually
>> >>>> (what you did) solves the problem.
>> >>>>
>> >>>> Strangely, when I run your code on my machine with the latest
>> >>>> 1.0-SNAPSHOT I see a lot of output on my console.
>> >>>>
>> >>>> Here's the output: https://gist.github.com/mxm/98cd927866b193ce0f89
>> >>>>
>> >>>> Could you add bolt which writes the Storm tuples to a file? Is that
>> >>>> file also empty?
>> >>>>
>> >>>> builder.setBolt("file", new BoltFileSink("/tmp/storm", new
>> >>>> OutputFormatter() {
>> >>>>   @Override
>> >>>>   public String format(Tuple tuple) {
>> >>>>      return tuple.toString();
>> >>>>   }
>> >>>> }), 1).shuffleGrouping("count");
>> >>>>
>> >>>>
>> >>>> Thanks,
>> >>>> Max
>> >>>
>> >>> ________________________________________________________
>> >>>
>> >>> The information contained in this e-mail is confidential and/or
>> >>> proprietary to Capital One and/or its affiliates and may only be used
>> >>> solely in performance of work or services for Capital One. The
>> >>> information transmitted herewith is intended only for use by the
>> >>> individual or entity to which it is addressed. If the reader of this
>> >>> message is not the intended recipient, you are hereby notified that
>> any
>> >>> review, retransmission, dissemination, distribution, copying or other
>> >>>use
>> >>> of, or taking of any action in reliance upon this information is
>> >>>strictly
>> >>> prohibited. If you have received this communication in error, please
>> >>> contact the sender and delete the material from your computer.
>> >>>
>> >>
>> >> ________________________________________________________
>> >>
>> >> The information contained in this e-mail is confidential and/or
>> >>proprietary to Capital One and/or its affiliates and may only be used
>> >>solely in performance of work or services for Capital One. The
>> >>information transmitted herewith is intended only for use by the
>> >>individual or entity to which it is addressed. If the reader of this
>> >>message is not the intended recipient, you are hereby notified that any
>> >>review, retransmission, dissemination, distribution, copying or other
>> >>use of, or taking of any action in reliance upon this information is
>> >>strictly prohibited. If you have received this communication in error,
>> >>please contact the sender and delete the material from your computer.
>> >>
>> >
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>
>

Reply via email to