>
> Multiple inputs per bolt is currently not supported. :(
> FlinkTopologyBuilder has a bug. There is already a JIRA for it:
> https://issues.apache.org/jira/browse/FLINK-2837
>
> I know already how to fix it (hope to can get it into 0.10.1)


I already have this working since a couple of days. Just didn't have time
to open a pull request yet. You can now execute Storm like joins with it.

Concerning your pull request: https://github.com/apache/flink/pull/1387
There are some things which I already fixed which are also contained in
your pull request. So after we merge this, I would like to open mine so we
don't conflict too much with our changes.

On Sat, Nov 21, 2015 at 6:45 PM, Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for your feedback! This is very valuable :)
>
> Please share your experience (positive and negative) when doing more
> complex stuff. And don't hesitate to ask if you have any questions.
>
> -Matthias
>
> On 11/21/2015 06:04 PM, Naveen Madhire wrote:
> > FYI, I just saw this email chain and thought of sharing my exp. I used
> the
> > Storm Flink API few days ago. Just a simple example worked well, however
> I
> > will be testing few more next week.
> >
> > One thing to note is, I had to include all Scala dependencies in the
> storm
> > topology since FlinkLocalCluster.java class has
> LocalFlinkMiniCluster.scala
> >
> >
> > Not sure if this is an issue but after including scala dependencies
> > everything worked well. ;)
> >
> >
> > On Fri, Nov 20, 2015 at 4:12 PM, Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> >> Multiple inputs per bolt is currently not supported. :(
> >> FlinkTopologyBuilder has a bug. There is already a JIRA for it:
> >> https://issues.apache.org/jira/browse/FLINK-2837
> >>
> >> I know already how to fix it (hope to can get it into 0.10.1)
> >>
> >> Removing FlinkTopologyBuilder does make sense (I did not do it because
> >> the members we need to access are private). Your idea to get access via
> >> reflection is good!
> >>
> >> Btw: can you also have a look here:
> >> https://github.com/apache/flink/pull/1387
> >> I would like to merge this ASAP but need some feedback.
> >>
> >> -Matthias
> >>
> >> On 11/20/2015 07:30 PM, Maximilian Michels wrote:
> >>> I thought about the API changes again. It probably does make sense to
> >>> keep the LocalCluster and StormSubmitter equivalent classes. That way,
> >>> we don't break the Storm API too much. Users can stick to the pattern
> >>> of using either FlinkCluster to execute locally or FlinkSubmitter to
> >>> submit remotely. Still, we can save some code by reusing Storm's
> >>> TopologyBuilder.
> >>>
> >>> I'll open a pull request with the changes. This also includes some
> >>> more examples and features (e.g. multiple inputs per Bolt).
> >>>
> >>> On Mon, Nov 16, 2015 at 4:33 PM, Maximilian Michels <m...@apache.org>
> >> wrote:
> >>>> You are right in saying that both API approaches support executing
> >>>> Storm jobs. However, I think the proposed changes make it much easier
> >>>> to reuse Storm topologies. And here is why:
> >>>>
> >>>> 1. No existing classes need to be exchanged.
> >>>>
> >>>> A Storm topology stays like it is. If you already have it defined
> >>>> somewhere, you simply pass it to the FlinkTopologyBuilder to create a
> >>>> StreamExecutionEnvironment.
> >>>>
> >>>> 2. Storm and Flink have different runtime behavior.
> >>>>
> >>>> IMHO makes more sense to make it transparent to the user that the
> >>>> result of the translation is an actual Flink job executed by the Flink
> >>>> runtime. Therefore, it makes sense to stick to the Flink way of
> >>>> executing. Hiding this fact behind Storm dummy classes can create
> >>>> problems for the user.
> >>>>
> >>>> 3. Code reuse
> >>>>
> >>>> As you can see in the proposed changes, it makes the implementation
> >>>> much simpler while retaining the desire functionality. That has also
> >>>> impact of testability and maintainability.
> >>>>
> >>>> I can also understand your perspective. I wonder if we could get some
> >>>> feedback from other people on the mailing list?
> >>>>
> >>>>
> >>>> Let me also address your other comments and suggestions:
> >>>>
> >>>>> * You changed examples to use finite-spouts -- from a testing point
> of
> >>>>> view this makes sense. However, the examples should show how to run
> an
> >>>>> *unmodified* Storm topology in Flink.
> >>>>
> >>>> Good point. As far as I know we only test finite sources in the Flink
> >>>> streaming tests. Using finite sources makes things much easier. I
> >>>> would like to keep the tests simple like this. We can still have
> >>>> separate tests to test the infinite attribute of the regular spouts.
> >>>> The examples can be converted back to using the infinite spout. IMHO
> >>>> the existing approach which involves waiting and killing of the
> >>>> topology doesn't seem to be the cleanest solution.
> >>>>
> >>>>> * we should keep the local copy "unprocessedBolts" when creating a
> >> Flink
> >>>>> program to allow to re-submit the same topology object twice (or
> alter
> >>>>> it after submission). If you don't make the copy,
> >> submitting/translating
> >>>>> the topology into a Flink job alters the object (which should not
> >>>>> happen). And as it is not performance critical, the copying overhead
> >>>>> does not matter.
> >>>>
> >>>> I didn't think about that but we can copy the spouts and bolts before
> >>>> processing them. I've added that to my local branch. However, I didn't
> >>>> see where this was done previously. Can you give me a hint?
> >>>>
> >>>>> * Why did you change the dop from 4 to 1 WordCountTopology ? We
> should
> >>>>> test in parallel fashion...
> >>>>
> >>>> Absolutely. Already reverted this locally.
> >>>>
> >>>>> * Too many reformatting changes ;) You though many classes without
> any
> >>>>> actual code changes.
> >>>>
> >>>> Yes, I ran "Optimize Imports" in IntelliJ. Sorry for that but this
> >>>> only affects the import statements.
> >>>>
> >>>> I would like to open a pull request soon to merge some of the changes.
> >>>> It would be great if some other people commented on the API changes
> >>>> and whether we should integrate direct support for spouts/bolts in
> >>>> DataStream. Next, I would like to test and bundle some more of the
> >>>> examples included in Storm.
> >>>>
> >>>> Cheers,
> >>>> Max
> >>>>
> >>>> On Sat, Nov 14, 2015 at 5:13 PM, Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>>> I just had a look at your proposal. It makes a lot of sense. I still
> >>>>> believe that it is a matter of taste if one prefers your or my point
> of
> >>>>> view. Both approaches allows to easily reuse and execute Storm
> >>>>> Topologies on Flink (what is the most important feature we need to
> >> have).
> >>>>>
> >>>>> I hope to get some more feedback from the community, if the
> >>>>> Strom-compatibility should be more "stormy" or more "flinky". Bot
> >>>>> approaches make sense to me.
> >>>>>
> >>>>>
> >>>>> I view minor comments:
> >>>>>
> >>>>> * FileSpout vs FiniteFileSpout
> >>>>>   -> FileSpout was implemented in a Storm way -- to set the
> "finished"
> >>>>> flag here does not make sense from a Storm point of view (there is no
> >>>>> such thing as a finite spout)
> >>>>>   Thus, this example shows how a regular Storm spout can be improved
> >>>>> using FiniteSpout interface -- I would keep it as is (even if seems
> to
> >>>>> be unnecessary complicated -- imagine that you don't have the code of
> >>>>> FileSpout)
> >>>>>
> >>>>> * You changed examples to use finite-spouts -- from a testing point
> of
> >>>>> view this makes sense. However, the examples should show how to run
> an
> >>>>> *unmodified* Storm topology in Flink.
> >>>>>
> >>>>> * we should keep the local copy "unprocessedBolts" when creating a
> >> Flink
> >>>>> program to allow to re-submit the same topology object twice (or
> alter
> >>>>> it after submission). If you don't make the copy,
> >> submitting/translating
> >>>>> the topology into a Flink job alters the object (which should not
> >>>>> happen). And as it is not performance critical, the copying overhead
> >>>>> does not matter.
> >>>>>
> >>>>> * Why did you change the dop from 4 to 1 WordCountTopology ? We
> should
> >>>>> test in parallel fashion...
> >>>>>
> >>>>> * Too many reformatting changes ;) You though many classes without
> any
> >>>>> actual code changes.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> -------- Forwarded Message --------
> >>>>> Subject: Re: Storm Compatibility
> >>>>> Date: Fri, 13 Nov 2015 12:15:19 +0100
> >>>>> From: Maximilian Michels <m...@apache.org>
> >>>>> To: Matthias J. Sax <mj...@apache.org>
> >>>>> CC: Stephan Ewen <se...@apache.org>, Robert Metzger <
> >> rmetz...@apache.org>
> >>>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Thank you for your remarks.
> >>>>>
> >>>>> I believe the goal of the compatibility layer should not be to mimic
> >>>>> Storm's API but to easily execute Storm typologies using Flink. I see
> >>>>> that it is easy for users to use class names for execution they know
> >>>>> from Storm but I think this makes the API verbose. I've refactored it
> >>>>> a bit to make it more aligned with Flink's execution model. After
> all,
> >>>>> the most important thing is that it makes it easy for people to reuse
> >>>>> Storm typologies while getting all the advantages of Flink.
> >>>>>
> >>>>> Let me explain what I have done so far:
> >>>>> https://github.com/apache/flink/compare/master...mxm:storm-dev
> >>>>>
> >>>>> API
> >>>>> - remove FlinkClient, FlinkSubmitter, FlinkLocalCluster,
> >>>>> FlinkTopology: They are not necessary in my opinion and are
> >>>>> replicating functionality already included in Flink or Storm.
> >>>>>
> >>>>> - Build the topology with the Storm TopologyBuilder (instead of
> >>>>> FlinkTopology) which is then passed to the FlinkTopologyBuilder which
> >>>>> generates the StreamExecutionEnvironment containing the StreamGraph.
> >>>>> You can then simply call execute() like you would usually do in
> Flink.
> >>>>> This lets you reuse your Storm typologies with the ease of Flink
> >>>>> context-based execution mechanism. Note that it works in local and
> >>>>> remote execution mode without changing any code.
> >>>>>
> >>>>> Tests
> >>>>> - replaced StormTestBase.java with StreamingTestBase
> >>>>> - use a Finite source for the tests and changed it a bit
> >>>>>
> >>>>> Examples
> >>>>> - Convert examples to new API
> >>>>> - Remove duplicate examples (local and remote)
> >>>>>
> >>>>> I hope these changes are not too invasive for you. I think it makes
> >>>>> the compatibility layer much easier to use. Let me know what you
> think
> >>>>> about it. Of course, we can iterate on it.
> >>>>>
> >>>>> About the integration of the compatibility layer into DataStream:
> >>>>> Wouldn't it be possible to set storm to provided and let the user
> >>>>> include the jar if he/she wants to use the Storm compatibility?
> That's
> >>>>> also what we do for other libraries like Gelly. You have to package
> >>>>> them into the JAR if you want to run them on the cluster. We should
> >>>>> give a good error message if classes cannot be found.
> >>>>>
> >>>>> +1 for moving the discussion to the dev list.
> >>>>>
> >>>>> Cheers,
> >>>>> Max
> >>>>>
> >>>>> On Fri, Nov 13, 2015 at 7:41 AM, Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>>>> One more thing that just came to my mind about (1): I have to
> correct
> >> my
> >>>>>> last reply on it:
> >>>>>>
> >>>>>> We **cannot reuse** TopologyBuilder because the returned
> StormTopology
> >>>>>> from .createTopology() does **not** contain the references to the
> >>>>>> Spout/Bolt object. Internally, those are already serialized into an
> >>>>>> internal Thrift representation (as preparation to get sent to
> Nimbus).
> >>>>>> However, in order to create a Flink job, we need the references of
> >> course...
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 11/11/2015 04:33 PM, Maximilian Michels wrote:
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Sorry for getting back to you late. I'm very new to Storm but have
> >>>>>>> familiarized myself a bit the last days. While looking through the
> >>>>>>> Storm examples and the compatibility layer I discovered the
> following
> >>>>>>> issues:
> >>>>>>>
> >>>>>>> 1) The compatibility layer mirrors the Storm API instead of reusing
> >>>>>>> it. Why do we need a FlinkTopologyBuilder, FlinkCluster,
> >>>>>>> FlinkSubmitter, FlinkClient? Couldn't all these user-facing classes
> >> by
> >>>>>>> replaced by e.g. StormExecutionEnvironment which receives the Storm
> >>>>>>> topology and upon getStreamGraph() just traverses it?
> >>>>>>>
> >>>>>>> 2) DRPC is not yet supported. I don't know how crucial this is but
> it
> >>>>>>> seems to be widespread Storm feature. If we wrapped the entire
> Storm
> >>>>>>> topology, we could give appropriate errors when we see such
> >>>>>>> unsupported features.
> >>>>>>>
> >>>>>>> 3) We could simplify embedding Spouts and Bolts directly as
> operator
> >>>>>>> functions. Users shouldn't have to worry about extracting the
> types.
> >>>>>>> Perhaps we could implement a dedicated method to add spouts/bolts
> on
> >>>>>>> DataStream?
> >>>>>>>
> >>>>>>> 5) Performance: The BoltWrapper creates a StormTuple for every
> >>>>>>> incoming record. I think this could be improved. Couldn't we use
> the
> >>>>>>> StormTuple as data type instead of Flink's tuples?
> >>>>>>>
> >>>>>>> 6) Trident Examples. Have you run any?
> >>>>>>>
> >>>>>>> That's it for now. I'm sure you know about many more improvements
> or
> >>>>>>> problems because you're the expert on this. In the meantime, I'll
> try
> >>>>>>> to contact you via IRC.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Max
> >>>>>>>
> >>>>>>> On Fri, Nov 6, 2015 at 6:25 PM, Matthias J. Sax <mj...@apache.org>
> >> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> that sounds great! I am very happy that people are interested in
> it
> >> and
> >>>>>>>> start to use it! Can you give some more details about this? I am
> >> just
> >>>>>>>> aware of a few question at SO. But there was no question about it
> >> on the
> >>>>>>>> mailing list lately... Did you get some more internal
> >> questions/feedback?
> >>>>>>>>
> >>>>>>>> And of course, other people should get involved as well! There is
> so
> >>>>>>>> much too do -- even if I work 40h a week on it, I cannot get
> >> everything
> >>>>>>>> done by myself. The last days were very busy for me. I hope I can
> >> work
> >>>>>>>> on a couple of bugs after the Munich Meetup. I started to look
> into
> >> them
> >>>>>>>> already...
> >>>>>>>>
> >>>>>>>> Should we start a roadmap in the Wiki? This might be helpful if
> more
> >>>>>>>> people get involved.
> >>>>>>>>
> >>>>>>>> And thanks for keeping me in the loop :)
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 11/06/2015 03:49 PM, Stephan Ewen wrote:
> >>>>>>>>> Hi Matthias!
> >>>>>>>>>
> >>>>>>>>> We are seeing a lot of people getting very excited about the
> Storm
> >>>>>>>>> Compatibility layer. I expect that quite a few people will
> >> seriously
> >>>>>>>>> start to work with it.
> >>>>>>>>>
> >>>>>>>>> I would suggest that we also start getting involved in that.
> Since
> >> you
> >>>>>>>>> have of course your priority on your Ph.D., it would be a little
> >> much
> >>>>>>>>> asked from you to dedicate a lot of time to support more
> features,
> >> be
> >>>>>>>>> super responsive with users all the time, etc.
> >>>>>>>>>
> >>>>>>>>> To that end, some people from us will start testing the API,
> adding
> >>>>>>>>> fixes, etc (which also helps us to understand this better when
> >> users ask
> >>>>>>>>> questions).
> >>>>>>>>> We would definitely like for you to stay involved (we don't want
> to
> >>>>>>>>> hijack this), and help with ideas, especially when it comes to
> >> things
> >>>>>>>>> like fault tolerance design, etc.
> >>>>>>>>>
> >>>>>>>>> What do you think?
> >>>>>>>>>
> >>>>>>>>> Greetings,
> >>>>>>>>> Stephan
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>
> >>
> >
>
>

Reply via email to