Your proposal for #1 looks good. I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add the stream spec straight onto the runner while in #2 you do it in a callback. If it is either-or, #1 looks a lot better for my purposes.
For #4 what mechanism are you using to distribute the JARs? Can you use the same mechanism to distribute the serialized graph? On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <xinyuliu...@gmail.com> wrote: > btw, I will get to SAMZA-1246 as soon as possible. > > Thanks, > Xinyu > > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com> wrote: > > > Let me try to capture the updated requirements: > > > > 1. Set up input streams outside StreamGraph, and treat graph building as > a > > library (*Fluent, Beam*). > > > > 2. Improve ease of use for ApplicationRunner to avoid complex > > configurations such as zkCoordinator, zkCoordinationService. > (*Standalone*). > > Provide some programmatic way to tweak them in the API. > > > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can > > have one or more implementations but it's hidden from the users. > > > > 4. Separate StreamGraph from runtime environment so it can be serialized > (*Beam, > > Yarn*) > > > > 5. Better life cycle management of application, parity with > > StreamProcessor (*Standalone, Beam*). Stats should include exception in > > case of failure (tracked in SAMZA-1246). > > > > 6. Support injecting user-defined objects into ApplicationRunner. > > > > Prateek and I iterate on the ApplilcationRunner API based on these > > requirements. To support #1, we can set up input streams on the runner > > level, which returns the MessageStream and allows graph building > > afterwards. The code looks like below: > > > > ApplicationRunner runner = ApplicationRunner.local(); > > runner.input(streamSpec) > > .map(..) > > .window(...) > > runner.run(); > > > > StreamSpec is the building block for setting up streams here. It can be > > set up in different ways: > > > > - Direct creation of stream spec, like runner.input(new StreamSpec(id, > > system, stream)) > > - Load from streamId from env or config, like > runner.input(runner.env(). > > getStreamSpec(id)) > > - Canned Spec which generates the StreamSpec with id, system and stream > > to minimize the configuration. For example, CollectionSpec.create(new > > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in > > the spec. > > > > To support #2, we need to be able to set up StreamSpec-related objects > and > > factories programmatically in env. Suppose we have the following before > > runner.input(...): > > > > runner.setup(env /* a writable interface of env*/ -> { > > env.setStreamSpec(streamId, streamSpec); > > env.setSystem(systemName, systemFactory); > > }) > > > > runner.setup(->) also provides setup for stores and other runtime stuff > > needed for the execution. The setup should be able to serialized to > config. > > For #6, I haven't figured out a good way to inject user-defined objects > > here yet. > > > > With this API, we should be able to also support #4. For remote > > runner.run(), the operator user classes/lamdas in the StreamGraph need to > > be serialized. As today, the existing option is to serialize to a stream, > > either the coordinator stream or the pipeline control stream, which will > > have the size limit per message. Do you see RPC as an option? > > > > For this version of API, seems we don't need the StreamApplication > wrapper > > as well as exposing the StreamGraph. Do you think we are on the right > path? > > > > Thanks, > > Xinyu > > > > > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < > > cpett...@linkedin.com.invalid> wrote: > > > >> That should have been: > >> > >> For #1, Beam doesn't have a hard requirement... > >> > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <cpett...@linkedin.com> > >> wrote: > >> > >> > For #1, I doesn't have a hard requirement for any change from Samza. A > >> > very nice to have would be to allow the input systems to be set up at > >> the > >> > same time as the rest of the StreamGraph. An even nicer to have would > >> be to > >> > do away with the callback based approach and treat graph building as a > >> > library, a la Beam and Flink. > >> > > >> > For the moment I've worked around the two pass requirement (once for > >> > config, once for StreamGraph) by introducing an IR layer between Beam > >> and > >> > the Samza Fluent translation. The IR layer is convenient independent > of > >> > this problem because it makes it easier to switch between the Fluent > and > >> > low-level APIs. > >> > > >> > > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in > >> > great shape. One additional issue with the status call that I may not > >> have > >> > mentioned is that it provides you no way to get at the cause of > failure. > >> > The StreamProcessor API does allow this via the callback. > >> > > >> > > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration > >> > indirection you currently have to jump through (this is also related > to > >> > system consumer configuration from #1. It makes it much easier to > >> discover > >> > what the configurable parameters are too, if we provide some > >> programmatic > >> > way to tweak them in the API - which can turn into config under the > >> hood. > >> > > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xinyuliu...@gmail.com> > >> wrote: > >> > > >> >> Let me give a shot to summarize the requirements for > ApplicationRunner > >> we > >> >> have discussed so far: > >> >> > >> >> - Support environment for passing in user-defined objects (streams > >> >> potentially) into ApplicationRunner (*Beam*) > >> >> > >> >> - Improve ease of use for ApplicationRunner to avoid complex > >> >> configurations > >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*) > >> >> > >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We > can > >> >> have one or more implementations but it's hidden from the users. > >> >> > >> >> - Separate StreamGraph from environment so it can be serializable > >> (*Beam, > >> >> Yarn*) > >> >> > >> >> - Better life cycle management of application, including > >> >> start/stop/stats (*Standalone, > >> >> Beam*) > >> >> > >> >> > >> >> One way to address 2 and 3 is to provide pre-packaged runner using > >> static > >> >> factory methods, and the return type will be the ApplicationRunner > >> >> interface. So we can have: > >> >> > >> >> ApplicationRunner runner = ApplicationRunner.zk() / > >> >> ApplicationRunner.local() > >> >> / ApplicationRunner.remote() / ApplicationRunner.test(). > >> >> > >> >> Internally we will package the right configs and run-time environment > >> with > >> >> the runner. For example, ApplicationRunner.zk() will define all the > >> >> configs > >> >> needed for zk coordination. > >> >> > >> >> To support 1 and 4, can we pass in a lambda function in the runner, > and > >> >> then we can run the stream graph? Like the following: > >> >> > >> >> ApplicationRunner.zk().env(config -> > environment).run(streamGraph); > >> >> > >> >> Then we need a way to pass the environment into the StreamGraph. This > >> can > >> >> be done by either adding an extra parameter to each operator, or > have a > >> >> getEnv() function in the MessageStream, which seems to be pretty > hacky. > >> >> > >> >> What do you think? > >> >> > >> >> Thanks, > >> >> Xinyu > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < > >> >> pmaheshw...@linkedin.com.invalid> wrote: > >> >> > >> >> > Thanks for putting this together Yi! > >> >> > > >> >> > I agree with Jake, it does seem like there are a few too many > moving > >> >> parts > >> >> > here. That said, the problem being solved is pretty broad, so let > me > >> >> try to > >> >> > summarize my current understanding of the requirements. Please > >> correct > >> >> me > >> >> > if I'm wrong or missing something. > >> >> > > >> >> > ApplicationRunner and JobRunner first, ignoring test environment > for > >> the > >> >> > moment. > >> >> > ApplicationRunner: > >> >> > 1. Create execution plan: Same in Standalone and Yarn > >> >> > 2. Create intermediate streams: Same logic but different leader > >> election > >> >> > (ZK-based or pre-configured in standalone, AM in Yarn). > >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn. > >> >> > > >> >> > JobRunner: > >> >> > 1. Run the StreamProcessors: Same process in Standalone & Test. > >> Remote > >> >> host > >> >> > in Yarn. > >> >> > > >> >> > To get a single ApplicationRunner implementation, like Jake > >> suggested, > >> >> we > >> >> > need to make leader election and JobRunner implementation > pluggable. > >> >> > There's still the question of whether ApplicationRunner#run API > >> should > >> >> be > >> >> > blocking or non-blocking. It has to be non-blocking in YARN. We > want > >> it > >> >> to > >> >> > be blocking in standalone, but seems like the main reason is ease > of > >> use > >> >> > when launched from main(). I'd prefer making it consitently > >> non-blocking > >> >> > instead, esp. since in embedded standalone mode (where the > processor > >> is > >> >> > running in another container) a blocking API would not be > >> user-friendly > >> >> > either. If not, we can add both run and runBlocking. > >> >> > > >> >> > Coming to RuntimeEnvironment, which is the least clear to me so > far: > >> >> > 1. I don't think RuntimeEnvironment should be responsible for > >> providing > >> >> > StreamSpecs for streamIds - they can be obtained with a config/util > >> >> class. > >> >> > The StreamProcessor should only know about logical streamIds and > the > >> >> > streamId <-> actual stream mapping should happen within the > >> >> > SystemProducer/Consumer/Admins provided by the RuntimeEnvironment. > >> >> > 2. There's also other components that the user might be interested > in > >> >> > providing implementations of in embedded Standalone mode (i.e., not > >> >> just in > >> >> > tests) - MetricsRegistry and JMXServer come to mind. > >> >> > 3. Most importantly, it's not clear to me who creates and manages > the > >> >> > RuntimeEnvironment. It seems like it should be the > ApplicationRunner > >> or > >> >> the > >> >> > user because of (2) above and because StreamManager also needs > >> access to > >> >> > SystemAdmins for creating intermediate streams which users might > >> want to > >> >> > mock. But it also needs to be passed down to the StreamProcessor - > >> how > >> >> > would this work on Yarn? > >> >> > > >> >> > I think we should figure out how to integrate RuntimeEnvironment > with > >> >> > ApplicationRunner before we can make a call on one vs. multiple > >> >> > ApplicationRunner implementations. If we do keep > >> LocalApplicationRunner > >> >> and > >> >> > RemoteApplication (and TestApplicationRunner) separate, agree with > >> Jake > >> >> > that we should remove the JobRunners and roll them up into the > >> >> respective > >> >> > ApplicationRunners. > >> >> > > >> >> > - Prateek > >> >> > > >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes <jacob.m...@gmail.com > > > >> >> wrote: > >> >> > > >> >> > > Thanks for the SEP! > >> >> > > > >> >> > > +1 on introducing these new components > >> >> > > -1 on the current definition of their roles (see Design feedback > >> >> below) > >> >> > > > >> >> > > *Design* > >> >> > > > >> >> > > - If LocalJobRunner and RemoteJobRunner handle the different > >> >> methods > >> >> > of > >> >> > > launching a Job, what additional value do the different types > of > >> >> > > ApplicationRunner and RuntimeEnvironment provide? It seems > like > >> a > >> >> red > >> >> > > flag > >> >> > > that all 3 would need to change from environment to > >> environment. It > >> >> > > indicates that they don't have proper modularity. The > >> >> > > call-sequence-figures > >> >> > > support this; LocalApplicationRunner and > RemoteApplicationRunner > >> >> make > >> >> > > the > >> >> > > same calls and the diagram only varies after jobRunner.start() > >> >> > > - As far as I can tell, the only difference between Local and > >> >> Remote > >> >> > > ApplicationRunner is that one is blocking and the other is > >> >> > > non-blocking. If > >> >> > > that's all they're for then either the names should be changed > >> to > >> >> > > reflect > >> >> > > this, or they should be combined into one ApplicationRunner > and > >> >> just > >> >> > > expose > >> >> > > separate methods for run() and runBlocking() > >> >> > > - There isn't much detail on why the main() methods for > >> >> Local/Remote > >> >> > > have such different implementations, how they receive the > >> >> Application > >> >> > > (direct vs config), and concretely how the deployment scripts, > >> if > >> >> any, > >> >> > > should interact with them. > >> >> > > > >> >> > > > >> >> > > *Style* > >> >> > > > >> >> > > - nit: None of the 11 uses of the word "actual" in the doc are > >> >> > > *actually* > >> >> > > needed. :-) > >> >> > > - nit: Colors of the runtime blocks in the diagrams are > >> >> unconventional > >> >> > > and a little distracting. Reminds me of nai won bao. Now I'm > >> >> hungry. > >> >> > :-) > >> >> > > - Prefer the name "ExecutionEnvironment" over > >> "RuntimeEnvironment". > >> >> > The > >> >> > > term "execution environment" is used > >> >> > > - The code comparisons for the ApplicationRunners are not > >> >> > apples-apples. > >> >> > > The local runner example is an application that USES the local > >> >> runner. > >> >> > > The > >> >> > > remote runner example is the just the runner code itself. So, > >> it's > >> >> not > >> >> > > readily apparent that we're comparing the main() methods and > not > >> >> the > >> >> > > application itself. > >> >> > > > >> >> > > > >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <nickpa...@gmail.com> > >> wrote: > >> >> > > > >> >> > > > Made some updates to clarify the role and functions of > >> >> > RuntimeEnvironment > >> >> > > > in SEP-2. > >> >> > > > > >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan <nickpa...@gmail.com> > >> >> wrote: > >> >> > > > > >> >> > > > > Hi, everyone, > >> >> > > > > > >> >> > > > > In light of new features such as fluent API and standalone > that > >> >> > > introduce > >> >> > > > > new deployment / application launch models in Samza, I > created > >> a > >> >> new > >> >> > > > SEP-2 > >> >> > > > > to address the new use cases. SEP-2 link: > https://cwiki.apache > >> . > >> >> > > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+ > Design > >> >> > > > > > >> >> > > > > Please take a look and give feedbacks! > >> >> > > > > > >> >> > > > > Thanks! > >> >> > > > > > >> >> > > > > -Yi > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> > > >> > > >> > > > > >