> > Multiple inputs per bolt is currently not supported. :( > FlinkTopologyBuilder has a bug. There is already a JIRA for it: > https://issues.apache.org/jira/browse/FLINK-2837 > > I know already how to fix it (hope to can get it into 0.10.1)
I already have this working since a couple of days. Just didn't have time to open a pull request yet. You can now execute Storm like joins with it. Concerning your pull request: https://github.com/apache/flink/pull/1387 There are some things which I already fixed which are also contained in your pull request. So after we merge this, I would like to open mine so we don't conflict too much with our changes. On Sat, Nov 21, 2015 at 6:45 PM, Matthias J. Sax <mj...@apache.org> wrote: > Thanks for your feedback! This is very valuable :) > > Please share your experience (positive and negative) when doing more > complex stuff. And don't hesitate to ask if you have any questions. > > -Matthias > > On 11/21/2015 06:04 PM, Naveen Madhire wrote: > > FYI, I just saw this email chain and thought of sharing my exp. I used > the > > Storm Flink API few days ago. Just a simple example worked well, however > I > > will be testing few more next week. > > > > One thing to note is, I had to include all Scala dependencies in the > storm > > topology since FlinkLocalCluster.java class has > LocalFlinkMiniCluster.scala > > > > > > Not sure if this is an issue but after including scala dependencies > > everything worked well. ;) > > > > > > On Fri, Nov 20, 2015 at 4:12 PM, Matthias J. Sax <mj...@apache.org> > wrote: > > > >> Multiple inputs per bolt is currently not supported. :( > >> FlinkTopologyBuilder has a bug. There is already a JIRA for it: > >> https://issues.apache.org/jira/browse/FLINK-2837 > >> > >> I know already how to fix it (hope to can get it into 0.10.1) > >> > >> Removing FlinkTopologyBuilder does make sense (I did not do it because > >> the members we need to access are private). Your idea to get access via > >> reflection is good! > >> > >> Btw: can you also have a look here: > >> https://github.com/apache/flink/pull/1387 > >> I would like to merge this ASAP but need some feedback. > >> > >> -Matthias > >> > >> On 11/20/2015 07:30 PM, Maximilian Michels wrote: > >>> I thought about the API changes again. It probably does make sense to > >>> keep the LocalCluster and StormSubmitter equivalent classes. That way, > >>> we don't break the Storm API too much. Users can stick to the pattern > >>> of using either FlinkCluster to execute locally or FlinkSubmitter to > >>> submit remotely. Still, we can save some code by reusing Storm's > >>> TopologyBuilder. > >>> > >>> I'll open a pull request with the changes. This also includes some > >>> more examples and features (e.g. multiple inputs per Bolt). > >>> > >>> On Mon, Nov 16, 2015 at 4:33 PM, Maximilian Michels <m...@apache.org> > >> wrote: > >>>> You are right in saying that both API approaches support executing > >>>> Storm jobs. However, I think the proposed changes make it much easier > >>>> to reuse Storm topologies. And here is why: > >>>> > >>>> 1. No existing classes need to be exchanged. > >>>> > >>>> A Storm topology stays like it is. If you already have it defined > >>>> somewhere, you simply pass it to the FlinkTopologyBuilder to create a > >>>> StreamExecutionEnvironment. > >>>> > >>>> 2. Storm and Flink have different runtime behavior. > >>>> > >>>> IMHO makes more sense to make it transparent to the user that the > >>>> result of the translation is an actual Flink job executed by the Flink > >>>> runtime. Therefore, it makes sense to stick to the Flink way of > >>>> executing. Hiding this fact behind Storm dummy classes can create > >>>> problems for the user. > >>>> > >>>> 3. Code reuse > >>>> > >>>> As you can see in the proposed changes, it makes the implementation > >>>> much simpler while retaining the desire functionality. That has also > >>>> impact of testability and maintainability. > >>>> > >>>> I can also understand your perspective. I wonder if we could get some > >>>> feedback from other people on the mailing list? > >>>> > >>>> > >>>> Let me also address your other comments and suggestions: > >>>> > >>>>> * You changed examples to use finite-spouts -- from a testing point > of > >>>>> view this makes sense. However, the examples should show how to run > an > >>>>> *unmodified* Storm topology in Flink. > >>>> > >>>> Good point. As far as I know we only test finite sources in the Flink > >>>> streaming tests. Using finite sources makes things much easier. I > >>>> would like to keep the tests simple like this. We can still have > >>>> separate tests to test the infinite attribute of the regular spouts. > >>>> The examples can be converted back to using the infinite spout. IMHO > >>>> the existing approach which involves waiting and killing of the > >>>> topology doesn't seem to be the cleanest solution. > >>>> > >>>>> * we should keep the local copy "unprocessedBolts" when creating a > >> Flink > >>>>> program to allow to re-submit the same topology object twice (or > alter > >>>>> it after submission). If you don't make the copy, > >> submitting/translating > >>>>> the topology into a Flink job alters the object (which should not > >>>>> happen). And as it is not performance critical, the copying overhead > >>>>> does not matter. > >>>> > >>>> I didn't think about that but we can copy the spouts and bolts before > >>>> processing them. I've added that to my local branch. However, I didn't > >>>> see where this was done previously. Can you give me a hint? > >>>> > >>>>> * Why did you change the dop from 4 to 1 WordCountTopology ? We > should > >>>>> test in parallel fashion... > >>>> > >>>> Absolutely. Already reverted this locally. > >>>> > >>>>> * Too many reformatting changes ;) You though many classes without > any > >>>>> actual code changes. > >>>> > >>>> Yes, I ran "Optimize Imports" in IntelliJ. Sorry for that but this > >>>> only affects the import statements. > >>>> > >>>> I would like to open a pull request soon to merge some of the changes. > >>>> It would be great if some other people commented on the API changes > >>>> and whether we should integrate direct support for spouts/bolts in > >>>> DataStream. Next, I would like to test and bundle some more of the > >>>> examples included in Storm. > >>>> > >>>> Cheers, > >>>> Max > >>>> > >>>> On Sat, Nov 14, 2015 at 5:13 PM, Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>>> I just had a look at your proposal. It makes a lot of sense. I still > >>>>> believe that it is a matter of taste if one prefers your or my point > of > >>>>> view. Both approaches allows to easily reuse and execute Storm > >>>>> Topologies on Flink (what is the most important feature we need to > >> have). > >>>>> > >>>>> I hope to get some more feedback from the community, if the > >>>>> Strom-compatibility should be more "stormy" or more "flinky". Bot > >>>>> approaches make sense to me. > >>>>> > >>>>> > >>>>> I view minor comments: > >>>>> > >>>>> * FileSpout vs FiniteFileSpout > >>>>> -> FileSpout was implemented in a Storm way -- to set the > "finished" > >>>>> flag here does not make sense from a Storm point of view (there is no > >>>>> such thing as a finite spout) > >>>>> Thus, this example shows how a regular Storm spout can be improved > >>>>> using FiniteSpout interface -- I would keep it as is (even if seems > to > >>>>> be unnecessary complicated -- imagine that you don't have the code of > >>>>> FileSpout) > >>>>> > >>>>> * You changed examples to use finite-spouts -- from a testing point > of > >>>>> view this makes sense. However, the examples should show how to run > an > >>>>> *unmodified* Storm topology in Flink. > >>>>> > >>>>> * we should keep the local copy "unprocessedBolts" when creating a > >> Flink > >>>>> program to allow to re-submit the same topology object twice (or > alter > >>>>> it after submission). If you don't make the copy, > >> submitting/translating > >>>>> the topology into a Flink job alters the object (which should not > >>>>> happen). And as it is not performance critical, the copying overhead > >>>>> does not matter. > >>>>> > >>>>> * Why did you change the dop from 4 to 1 WordCountTopology ? We > should > >>>>> test in parallel fashion... > >>>>> > >>>>> * Too many reformatting changes ;) You though many classes without > any > >>>>> actual code changes. > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> -------- Forwarded Message -------- > >>>>> Subject: Re: Storm Compatibility > >>>>> Date: Fri, 13 Nov 2015 12:15:19 +0100 > >>>>> From: Maximilian Michels <m...@apache.org> > >>>>> To: Matthias J. Sax <mj...@apache.org> > >>>>> CC: Stephan Ewen <se...@apache.org>, Robert Metzger < > >> rmetz...@apache.org> > >>>>> > >>>>> Hi Matthias, > >>>>> > >>>>> Thank you for your remarks. > >>>>> > >>>>> I believe the goal of the compatibility layer should not be to mimic > >>>>> Storm's API but to easily execute Storm typologies using Flink. I see > >>>>> that it is easy for users to use class names for execution they know > >>>>> from Storm but I think this makes the API verbose. I've refactored it > >>>>> a bit to make it more aligned with Flink's execution model. After > all, > >>>>> the most important thing is that it makes it easy for people to reuse > >>>>> Storm typologies while getting all the advantages of Flink. > >>>>> > >>>>> Let me explain what I have done so far: > >>>>> https://github.com/apache/flink/compare/master...mxm:storm-dev > >>>>> > >>>>> API > >>>>> - remove FlinkClient, FlinkSubmitter, FlinkLocalCluster, > >>>>> FlinkTopology: They are not necessary in my opinion and are > >>>>> replicating functionality already included in Flink or Storm. > >>>>> > >>>>> - Build the topology with the Storm TopologyBuilder (instead of > >>>>> FlinkTopology) which is then passed to the FlinkTopologyBuilder which > >>>>> generates the StreamExecutionEnvironment containing the StreamGraph. > >>>>> You can then simply call execute() like you would usually do in > Flink. > >>>>> This lets you reuse your Storm typologies with the ease of Flink > >>>>> context-based execution mechanism. Note that it works in local and > >>>>> remote execution mode without changing any code. > >>>>> > >>>>> Tests > >>>>> - replaced StormTestBase.java with StreamingTestBase > >>>>> - use a Finite source for the tests and changed it a bit > >>>>> > >>>>> Examples > >>>>> - Convert examples to new API > >>>>> - Remove duplicate examples (local and remote) > >>>>> > >>>>> I hope these changes are not too invasive for you. I think it makes > >>>>> the compatibility layer much easier to use. Let me know what you > think > >>>>> about it. Of course, we can iterate on it. > >>>>> > >>>>> About the integration of the compatibility layer into DataStream: > >>>>> Wouldn't it be possible to set storm to provided and let the user > >>>>> include the jar if he/she wants to use the Storm compatibility? > That's > >>>>> also what we do for other libraries like Gelly. You have to package > >>>>> them into the JAR if you want to run them on the cluster. We should > >>>>> give a good error message if classes cannot be found. > >>>>> > >>>>> +1 for moving the discussion to the dev list. > >>>>> > >>>>> Cheers, > >>>>> Max > >>>>> > >>>>> On Fri, Nov 13, 2015 at 7:41 AM, Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>>>> One more thing that just came to my mind about (1): I have to > correct > >> my > >>>>>> last reply on it: > >>>>>> > >>>>>> We **cannot reuse** TopologyBuilder because the returned > StormTopology > >>>>>> from .createTopology() does **not** contain the references to the > >>>>>> Spout/Bolt object. Internally, those are already serialized into an > >>>>>> internal Thrift representation (as preparation to get sent to > Nimbus). > >>>>>> However, in order to create a Flink job, we need the references of > >> course... > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> On 11/11/2015 04:33 PM, Maximilian Michels wrote: > >>>>>>> Hi Matthias, > >>>>>>> > >>>>>>> Sorry for getting back to you late. I'm very new to Storm but have > >>>>>>> familiarized myself a bit the last days. While looking through the > >>>>>>> Storm examples and the compatibility layer I discovered the > following > >>>>>>> issues: > >>>>>>> > >>>>>>> 1) The compatibility layer mirrors the Storm API instead of reusing > >>>>>>> it. Why do we need a FlinkTopologyBuilder, FlinkCluster, > >>>>>>> FlinkSubmitter, FlinkClient? Couldn't all these user-facing classes > >> by > >>>>>>> replaced by e.g. StormExecutionEnvironment which receives the Storm > >>>>>>> topology and upon getStreamGraph() just traverses it? > >>>>>>> > >>>>>>> 2) DRPC is not yet supported. I don't know how crucial this is but > it > >>>>>>> seems to be widespread Storm feature. If we wrapped the entire > Storm > >>>>>>> topology, we could give appropriate errors when we see such > >>>>>>> unsupported features. > >>>>>>> > >>>>>>> 3) We could simplify embedding Spouts and Bolts directly as > operator > >>>>>>> functions. Users shouldn't have to worry about extracting the > types. > >>>>>>> Perhaps we could implement a dedicated method to add spouts/bolts > on > >>>>>>> DataStream? > >>>>>>> > >>>>>>> 5) Performance: The BoltWrapper creates a StormTuple for every > >>>>>>> incoming record. I think this could be improved. Couldn't we use > the > >>>>>>> StormTuple as data type instead of Flink's tuples? > >>>>>>> > >>>>>>> 6) Trident Examples. Have you run any? > >>>>>>> > >>>>>>> That's it for now. I'm sure you know about many more improvements > or > >>>>>>> problems because you're the expert on this. In the meantime, I'll > try > >>>>>>> to contact you via IRC. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Max > >>>>>>> > >>>>>>> On Fri, Nov 6, 2015 at 6:25 PM, Matthias J. Sax <mj...@apache.org> > >> wrote: > >>>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> that sounds great! I am very happy that people are interested in > it > >> and > >>>>>>>> start to use it! Can you give some more details about this? I am > >> just > >>>>>>>> aware of a few question at SO. But there was no question about it > >> on the > >>>>>>>> mailing list lately... Did you get some more internal > >> questions/feedback? > >>>>>>>> > >>>>>>>> And of course, other people should get involved as well! There is > so > >>>>>>>> much too do -- even if I work 40h a week on it, I cannot get > >> everything > >>>>>>>> done by myself. The last days were very busy for me. I hope I can > >> work > >>>>>>>> on a couple of bugs after the Munich Meetup. I started to look > into > >> them > >>>>>>>> already... > >>>>>>>> > >>>>>>>> Should we start a roadmap in the Wiki? This might be helpful if > more > >>>>>>>> people get involved. > >>>>>>>> > >>>>>>>> And thanks for keeping me in the loop :) > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 11/06/2015 03:49 PM, Stephan Ewen wrote: > >>>>>>>>> Hi Matthias! > >>>>>>>>> > >>>>>>>>> We are seeing a lot of people getting very excited about the > Storm > >>>>>>>>> Compatibility layer. I expect that quite a few people will > >> seriously > >>>>>>>>> start to work with it. > >>>>>>>>> > >>>>>>>>> I would suggest that we also start getting involved in that. > Since > >> you > >>>>>>>>> have of course your priority on your Ph.D., it would be a little > >> much > >>>>>>>>> asked from you to dedicate a lot of time to support more > features, > >> be > >>>>>>>>> super responsive with users all the time, etc. > >>>>>>>>> > >>>>>>>>> To that end, some people from us will start testing the API, > adding > >>>>>>>>> fixes, etc (which also helps us to understand this better when > >> users ask > >>>>>>>>> questions). > >>>>>>>>> We would definitely like for you to stay involved (we don't want > to > >>>>>>>>> hijack this), and help with ideas, especially when it comes to > >> things > >>>>>>>>> like fault tolerance design, etc. > >>>>>>>>> > >>>>>>>>> What do you think? > >>>>>>>>> > >>>>>>>>> Greetings, > >>>>>>>>> Stephan > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >> > >> > > > >