Your proposal for #1 looks good.

I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
the stream spec straight onto the runner while in #2 you do it in a
callback. If it is either-or, #1 looks a lot better for my purposes.

For #4 what mechanism are you using to distribute the JARs? Can you use the
same mechanism to distribute the serialized graph?

On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <xinyuliu...@gmail.com> wrote:

> btw, I will get to SAMZA-1246 as soon as possible.
>
> Thanks,
> Xinyu
>
> On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com> wrote:
>
> > Let me try to capture the updated requirements:
> >
> > 1. Set up input streams outside StreamGraph, and treat graph building as
> a
> > library (*Fluent, Beam*).
> >
> > 2. Improve ease of use for ApplicationRunner to avoid complex
> > configurations such as zkCoordinator, zkCoordinationService.
> (*Standalone*).
> > Provide some programmatic way to tweak them in the API.
> >
> > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> > have one or more implementations but it's hidden from the users.
> >
> > 4. Separate StreamGraph from runtime environment so it can be serialized
> (*Beam,
> > Yarn*)
> >
> > 5. Better life cycle management of application, parity with
> > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > case of failure (tracked in SAMZA-1246).
> >
> > 6. Support injecting user-defined objects into ApplicationRunner.
> >
> > Prateek and I iterate on the ApplilcationRunner API based on these
> > requirements. To support #1, we can set up input streams on the runner
> > level, which returns the MessageStream and allows graph building
> > afterwards. The code looks like below:
> >
> >   ApplicationRunner runner = ApplicationRunner.local();
> >   runner.input(streamSpec)
> >             .map(..)
> >             .window(...)
> >   runner.run();
> >
> > StreamSpec is the building block for setting up streams here. It can be
> > set up in different ways:
> >
> >   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> > system, stream))
> >   - Load from streamId from env or config, like
> runner.input(runner.env().
> > getStreamSpec(id))
> >   - Canned Spec which generates the StreamSpec with id, system and stream
> > to minimize the configuration. For example, CollectionSpec.create(new
> > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> > the spec.
> >
> > To support #2, we need to be able to set up StreamSpec-related objects
> and
> > factories programmatically in env. Suppose we have the following before
> > runner.input(...):
> >
> >   runner.setup(env /* a writable interface of env*/ -> {
> >     env.setStreamSpec(streamId, streamSpec);
> >     env.setSystem(systemName, systemFactory);
> >   })
> >
> > runner.setup(->) also provides setup for stores and other runtime stuff
> > needed for the execution. The setup should be able to serialized to
> config.
> > For #6, I haven't figured out a good way to inject user-defined objects
> > here yet.
> >
> > With this API, we should be able to also support #4. For remote
> > runner.run(), the operator user classes/lamdas in the StreamGraph need to
> > be serialized. As today, the existing option is to serialize to a stream,
> > either the coordinator stream or the pipeline control stream, which will
> > have the size limit per message. Do you see RPC as an option?
> >
> > For this version of API, seems we don't need the StreamApplication
> wrapper
> > as well as exposing the StreamGraph. Do you think we are on the right
> path?
> >
> > Thanks,
> > Xinyu
> >
> >
> > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> >> That should have been:
> >>
> >> For #1, Beam doesn't have a hard requirement...
> >>
> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <cpett...@linkedin.com>
> >> wrote:
> >>
> >> > For #1, I doesn't have a hard requirement for any change from Samza. A
> >> > very nice to have would be to allow the input systems to be set up at
> >> the
> >> > same time as the rest of the StreamGraph. An even nicer to have would
> >> be to
> >> > do away with the callback based approach and treat graph building as a
> >> > library, a la Beam and Flink.
> >> >
> >> > For the moment I've worked around the two pass requirement (once for
> >> > config, once for StreamGraph) by introducing an IR layer between Beam
> >> and
> >> > the Samza Fluent translation. The IR layer is convenient independent
> of
> >> > this problem because it makes it easier to switch between the Fluent
> and
> >> > low-level APIs.
> >> >
> >> >
> >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> >> > great shape. One additional issue with the status call that I may not
> >> have
> >> > mentioned is that it provides you no way to get at the cause of
> failure.
> >> > The StreamProcessor API does allow this via the callback.
> >> >
> >> >
> >> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> >> > indirection you currently have to jump through (this is also related
> to
> >> > system consumer configuration from #1. It makes it much easier to
> >> discover
> >> > what the configurable parameters are too, if we provide some
> >> programmatic
> >> > way to tweak them in the API - which can turn into config under the
> >> hood.
> >> >
> >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xinyuliu...@gmail.com>
> >> wrote:
> >> >
> >> >> Let me give a shot to summarize the requirements for
> ApplicationRunner
> >> we
> >> >> have discussed so far:
> >> >>
> >> >> - Support environment for passing in user-defined objects (streams
> >> >> potentially) into ApplicationRunner (*Beam*)
> >> >>
> >> >> - Improve ease of use for ApplicationRunner to avoid complex
> >> >> configurations
> >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
> >> >>
> >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We
> can
> >> >> have one or more implementations but it's hidden from the users.
> >> >>
> >> >> - Separate StreamGraph from environment so it can be serializable
> >> (*Beam,
> >> >> Yarn*)
> >> >>
> >> >> - Better life cycle management of application, including
> >> >> start/stop/stats (*Standalone,
> >> >> Beam*)
> >> >>
> >> >>
> >> >> One way to address 2 and 3 is to provide pre-packaged runner using
> >> static
> >> >> factory methods, and the return type will be the ApplicationRunner
> >> >> interface. So we can have:
> >> >>
> >> >>   ApplicationRunner runner = ApplicationRunner.zk() /
> >> >> ApplicationRunner.local()
> >> >> / ApplicationRunner.remote() / ApplicationRunner.test().
> >> >>
> >> >> Internally we will package the right configs and run-time environment
> >> with
> >> >> the runner. For example, ApplicationRunner.zk() will define all the
> >> >> configs
> >> >> needed for zk coordination.
> >> >>
> >> >> To support 1 and 4, can we pass in a lambda function in the runner,
> and
> >> >> then we can run the stream graph? Like the following:
> >> >>
> >> >>   ApplicationRunner.zk().env(config ->
> environment).run(streamGraph);
> >> >>
> >> >> Then we need a way to pass the environment into the StreamGraph. This
> >> can
> >> >> be done by either adding an extra parameter to each operator, or
> have a
> >> >> getEnv() function in the MessageStream, which seems to be pretty
> hacky.
> >> >>
> >> >> What do you think?
> >> >>
> >> >> Thanks,
> >> >> Xinyu
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
> >> >> pmaheshw...@linkedin.com.invalid> wrote:
> >> >>
> >> >> > Thanks for putting this together Yi!
> >> >> >
> >> >> > I agree with Jake, it does seem like there are a few too many
> moving
> >> >> parts
> >> >> > here. That said, the problem being solved is pretty broad, so let
> me
> >> >> try to
> >> >> > summarize my current understanding of the requirements. Please
> >> correct
> >> >> me
> >> >> > if I'm wrong or missing something.
> >> >> >
> >> >> > ApplicationRunner and JobRunner first, ignoring test environment
> for
> >> the
> >> >> > moment.
> >> >> > ApplicationRunner:
> >> >> > 1. Create execution plan: Same in Standalone and Yarn
> >> >> > 2. Create intermediate streams: Same logic but different leader
> >> election
> >> >> > (ZK-based or pre-configured in standalone, AM in Yarn).
> >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
> >> >> >
> >> >> > JobRunner:
> >> >> > 1. Run the StreamProcessors: Same process in Standalone & Test.
> >> Remote
> >> >> host
> >> >> > in Yarn.
> >> >> >
> >> >> > To get a single ApplicationRunner implementation, like Jake
> >> suggested,
> >> >> we
> >> >> > need to make leader election and JobRunner implementation
> pluggable.
> >> >> > There's still the question of whether ApplicationRunner#run API
> >> should
> >> >> be
> >> >> > blocking or non-blocking. It has to be non-blocking in YARN. We
> want
> >> it
> >> >> to
> >> >> > be blocking in standalone, but seems like the main reason is ease
> of
> >> use
> >> >> > when launched from main(). I'd prefer making it consitently
> >> non-blocking
> >> >> > instead, esp. since in embedded standalone mode (where the
> processor
> >> is
> >> >> > running in another container) a blocking API would not be
> >> user-friendly
> >> >> > either. If not, we can add both run and runBlocking.
> >> >> >
> >> >> > Coming to RuntimeEnvironment, which is the least clear to me so
> far:
> >> >> > 1. I don't think RuntimeEnvironment should be responsible for
> >> providing
> >> >> > StreamSpecs for streamIds - they can be obtained with a config/util
> >> >> class.
> >> >> > The StreamProcessor should only know about logical streamIds and
> the
> >> >> > streamId <-> actual stream mapping should happen within the
> >> >> > SystemProducer/Consumer/Admins provided by the RuntimeEnvironment.
> >> >> > 2. There's also other components that the user might be interested
> in
> >> >> > providing implementations of in embedded Standalone mode (i.e., not
> >> >> just in
> >> >> > tests) - MetricsRegistry and JMXServer come to mind.
> >> >> > 3. Most importantly, it's not clear to me who creates and manages
> the
> >> >> > RuntimeEnvironment. It seems like it should be the
> ApplicationRunner
> >> or
> >> >> the
> >> >> > user because of (2) above and because StreamManager also needs
> >> access to
> >> >> > SystemAdmins for creating intermediate streams which users might
> >> want to
> >> >> > mock. But it also needs to be passed down to the StreamProcessor -
> >> how
> >> >> > would this work on Yarn?
> >> >> >
> >> >> > I think we should figure out how to integrate RuntimeEnvironment
> with
> >> >> > ApplicationRunner before we can make a call on one vs. multiple
> >> >> > ApplicationRunner implementations. If we do keep
> >> LocalApplicationRunner
> >> >> and
> >> >> > RemoteApplication (and TestApplicationRunner) separate, agree with
> >> Jake
> >> >> > that we should remove the JobRunners and roll them up into the
> >> >> respective
> >> >> > ApplicationRunners.
> >> >> >
> >> >> > - Prateek
> >> >> >
> >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes <jacob.m...@gmail.com
> >
> >> >> wrote:
> >> >> >
> >> >> > > Thanks for the SEP!
> >> >> > >
> >> >> > > +1 on introducing these new components
> >> >> > > -1 on the current definition of their roles (see Design feedback
> >> >> below)
> >> >> > >
> >> >> > > *Design*
> >> >> > >
> >> >> > >    - If LocalJobRunner and RemoteJobRunner handle the different
> >> >> methods
> >> >> > of
> >> >> > >    launching a Job, what additional value do the different types
> of
> >> >> > >    ApplicationRunner and RuntimeEnvironment provide? It seems
> like
> >> a
> >> >> red
> >> >> > > flag
> >> >> > >    that all 3 would need to change from environment to
> >> environment. It
> >> >> > >    indicates that they don't have proper modularity. The
> >> >> > > call-sequence-figures
> >> >> > >    support this; LocalApplicationRunner and
> RemoteApplicationRunner
> >> >> make
> >> >> > > the
> >> >> > >    same calls and the diagram only varies after jobRunner.start()
> >> >> > >    - As far as I can tell, the only difference between Local and
> >> >> Remote
> >> >> > >    ApplicationRunner is that one is blocking and the other is
> >> >> > > non-blocking. If
> >> >> > >    that's all they're for then either the names should be changed
> >> to
> >> >> > > reflect
> >> >> > >    this, or they should be combined into one ApplicationRunner
> and
> >> >> just
> >> >> > > expose
> >> >> > >    separate methods for run() and runBlocking()
> >> >> > >    - There isn't much detail on why the main() methods for
> >> >> Local/Remote
> >> >> > >    have such different implementations, how they receive the
> >> >> Application
> >> >> > >    (direct vs config), and concretely how the deployment scripts,
> >> if
> >> >> any,
> >> >> > >    should interact with them.
> >> >> > >
> >> >> > >
> >> >> > > *Style*
> >> >> > >
> >> >> > >    - nit: None of the 11 uses of the word "actual" in the doc are
> >> >> > > *actually*
> >> >> > >    needed. :-)
> >> >> > >    - nit: Colors of the runtime blocks in the diagrams are
> >> >> unconventional
> >> >> > >    and a little distracting. Reminds me of nai won bao. Now I'm
> >> >> hungry.
> >> >> > :-)
> >> >> > >    - Prefer the name "ExecutionEnvironment" over
> >> "RuntimeEnvironment".
> >> >> > The
> >> >> > >    term "execution environment" is used
> >> >> > >    - The code comparisons for the ApplicationRunners are not
> >> >> > apples-apples.
> >> >> > >    The local runner example is an application that USES the local
> >> >> runner.
> >> >> > > The
> >> >> > >    remote runner example is the just the runner code itself. So,
> >> it's
> >> >> not
> >> >> > >    readily apparent that we're comparing the main() methods and
> not
> >> >> the
> >> >> > >    application itself.
> >> >> > >
> >> >> > >
> >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <nickpa...@gmail.com>
> >> wrote:
> >> >> > >
> >> >> > > > Made some updates to clarify the role and functions of
> >> >> > RuntimeEnvironment
> >> >> > > > in SEP-2.
> >> >> > > >
> >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan <nickpa...@gmail.com>
> >> >> wrote:
> >> >> > > >
> >> >> > > > > Hi, everyone,
> >> >> > > > >
> >> >> > > > > In light of new features such as fluent API and standalone
> that
> >> >> > > introduce
> >> >> > > > > new deployment / application launch models in Samza, I
> created
> >> a
> >> >> new
> >> >> > > > SEP-2
> >> >> > > > > to address the new use cases. SEP-2 link:
> https://cwiki.apache
> >> .
> >> >> > > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+
> Design
> >> >> > > > >
> >> >> > > > > Please take a look and give feedbacks!
> >> >> > > > >
> >> >> > > > > Thanks!
> >> >> > > > >
> >> >> > > > > -Yi
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >> >
> >> >
> >>
> >
> >
>

Reply via email to