Thanks for summarizing and looping me back into the discussion Xinyu.
Apparently I've been missing some emails from this list at my work address
and some of my replies from personal email aren't being delivered either.

Agree that we should replace the callback-based graph building in
StreamApplication with a library-like model. The callback based approach
made sense for the low-level fluent API, but the library based approach is
much cleaner for a high-level API.

Agree that we should serialize the logical graph and user-defined functions
and distribute them to the containers for execution. This will also give us
an intermediate representation to convert beam/sql plans to. I'd prefer a
clear REST/RPC API b/w the leader and follower JobCoordinators for
exchanging configs/models/graphs instead of putting them in configs as
well.

Agree that we should try to make the ApplicationRunner lifecycle similar to
StreamProcessor's (async + callbacks). Xinyu and Jake have a better idea
about whether this can be reasonably done for both local and remote
execution.

Agree that we should separate the runtime environment creation from the
logical graph description. This will help with graph serialization, testing
and lifecycle management of user-provided objects. Also agree that
environment creation logic should be serializable.

There are several pluggable components (and their configs) in Samza, so a
clean and user-friendly API for environment creation would be my highest
priority. My feeling is that convenience APIs for #2 and #3 will be much
easier to create once we get the base environment API right.

Similarly, I'd also like to get a better sense of how the environment
(including systems) will be set-up by users and wired internally before
working on different ways of describing the inputs/specs.

Thanks,
Prateek

On Thu, Apr 27, 2017 at 9:14 PM, xinyu liu <xinyuliu...@gmail.com> wrote:

> btw, I will get to SAMZA-1246 as soon as possible.
>
> Thanks,
> Xinyu
>
> On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com> wrote:
>
> > Let me try to capture the updated requirements:
> >
> > 1. Set up input streams outside StreamGraph, and treat graph building as
> a
> > library (*Fluent, Beam*).
> >
> > 2. Improve ease of use for ApplicationRunner to avoid complex
> > configurations such as zkCoordinator, zkCoordinationService.
> (*Standalone*).
> > Provide some programmatic way to tweak them in the API.
> >
> > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> > have one or more implementations but it's hidden from the users.
> >
> > 4. Separate StreamGraph from runtime environment so it can be serialized
> (*Beam,
> > Yarn*)
> >
> > 5. Better life cycle management of application, parity with
> > StreamProcessor (*Standalone, Beam*). Stats should include exception in
> > case of failure (tracked in SAMZA-1246).
> >
> > 6. Support injecting user-defined objects into ApplicationRunner.
> >
> > Prateek and I iterate on the ApplilcationRunner API based on these
> > requirements. To support #1, we can set up input streams on the runner
> > level, which returns the MessageStream and allows graph building
> > afterwards. The code looks like below:
> >
> >   ApplicationRunner runner = ApplicationRunner.local();
> >   runner.input(streamSpec)
> >             .map(..)
> >             .window(...)
> >   runner.run();
> >
> > StreamSpec is the building block for setting up streams here. It can be
> > set up in different ways:
> >
> >   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> > system, stream))
> >   - Load from streamId from env or config, like
> runner.input(runner.env().
> > getStreamSpec(id))
> >   - Canned Spec which generates the StreamSpec with id, system and stream
> > to minimize the configuration. For example, CollectionSpec.create(new
> > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> > the spec.
> >
> > To support #2, we need to be able to set up StreamSpec-related objects
> and
> > factories programmatically in env. Suppose we have the following before
> > runner.input(...):
> >
> >   runner.setup(env /* a writable interface of env*/ -> {
> >     env.setStreamSpec(streamId, streamSpec);
> >     env.setSystem(systemName, systemFactory);
> >   })
> >
> > runner.setup(->) also provides setup for stores and other runtime stuff
> > needed for the execution. The setup should be able to serialized to
> config.
> > For #6, I haven't figured out a good way to inject user-defined objects
> > here yet.
> >
> > With this API, we should be able to also support #4. For remote
> > runner.run(), the operator user classes/lamdas in the StreamGraph need to
> > be serialized. As today, the existing option is to serialize to a stream,
> > either the coordinator stream or the pipeline control stream, which will
> > have the size limit per message. Do you see RPC as an option?
> >
> > For this version of API, seems we don't need the StreamApplication
> wrapper
> > as well as exposing the StreamGraph. Do you think we are on the right
> path?
> >
> > Thanks,
> > Xinyu
> >
> >
> > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> > cpett...@linkedin.com.invalid> wrote:
> >
> >> That should have been:
> >>
> >> For #1, Beam doesn't have a hard requirement...
> >>
> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <cpett...@linkedin.com>
> >> wrote:
> >>
> >> > For #1, I doesn't have a hard requirement for any change from Samza. A
> >> > very nice to have would be to allow the input systems to be set up at
> >> the
> >> > same time as the rest of the StreamGraph. An even nicer to have would
> >> be to
> >> > do away with the callback based approach and treat graph building as a
> >> > library, a la Beam and Flink.
> >> >
> >> > For the moment I've worked around the two pass requirement (once for
> >> > config, once for StreamGraph) by introducing an IR layer between Beam
> >> and
> >> > the Samza Fluent translation. The IR layer is convenient independent
> of
> >> > this problem because it makes it easier to switch between the Fluent
> and
> >> > low-level APIs.
> >> >
> >> >
> >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
> >> > great shape. One additional issue with the status call that I may not
> >> have
> >> > mentioned is that it provides you no way to get at the cause of
> failure.
> >> > The StreamProcessor API does allow this via the callback.
> >> >
> >> >
> >> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
> >> > indirection you currently have to jump through (this is also related
> to
> >> > system consumer configuration from #1. It makes it much easier to
> >> discover
> >> > what the configurable parameters are too, if we provide some
> >> programmatic
> >> > way to tweak them in the API - which can turn into config under the
> >> hood.
> >> >
> >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xinyuliu...@gmail.com>
> >> wrote:
> >> >
> >> >> Let me give a shot to summarize the requirements for
> ApplicationRunner
> >> we
> >> >> have discussed so far:
> >> >>
> >> >> - Support environment for passing in user-defined objects (streams
> >> >> potentially) into ApplicationRunner (*Beam*)
> >> >>
> >> >> - Improve ease of use for ApplicationRunner to avoid complex
> >> >> configurations
> >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
> >> >>
> >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We
> can
> >> >> have one or more implementations but it's hidden from the users.
> >> >>
> >> >> - Separate StreamGraph from environment so it can be serializable
> >> (*Beam,
> >> >> Yarn*)
> >> >>
> >> >> - Better life cycle management of application, including
> >> >> start/stop/stats (*Standalone,
> >> >> Beam*)
> >> >>
> >> >>
> >> >> One way to address 2 and 3 is to provide pre-packaged runner using
> >> static
> >> >> factory methods, and the return type will be the ApplicationRunner
> >> >> interface. So we can have:
> >> >>
> >> >>   ApplicationRunner runner = ApplicationRunner.zk() /
> >> >> ApplicationRunner.local()
> >> >> / ApplicationRunner.remote() / ApplicationRunner.test().
> >> >>
> >> >> Internally we will package the right configs and run-time environment
> >> with
> >> >> the runner. For example, ApplicationRunner.zk() will define all the
> >> >> configs
> >> >> needed for zk coordination.
> >> >>
> >> >> To support 1 and 4, can we pass in a lambda function in the runner,
> and
> >> >> then we can run the stream graph? Like the following:
> >> >>
> >> >>   ApplicationRunner.zk().env(config ->
> environment).run(streamGraph);
> >> >>
> >> >> Then we need a way to pass the environment into the StreamGraph. This
> >> can
> >> >> be done by either adding an extra parameter to each operator, or
> have a
> >> >> getEnv() function in the MessageStream, which seems to be pretty
> hacky.
> >> >>
> >> >> What do you think?
> >> >>
> >> >> Thanks,
> >> >> Xinyu
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
> >> >> pmaheshw...@linkedin.com.invalid> wrote:
> >> >>
> >> >> > Thanks for putting this together Yi!
> >> >> >
> >> >> > I agree with Jake, it does seem like there are a few too many
> moving
> >> >> parts
> >> >> > here. That said, the problem being solved is pretty broad, so let
> me
> >> >> try to
> >> >> > summarize my current understanding of the requirements. Please
> >> correct
> >> >> me
> >> >> > if I'm wrong or missing something.
> >> >> >
> >> >> > ApplicationRunner and JobRunner first, ignoring test environment
> for
> >> the
> >> >> > moment.
> >> >> > ApplicationRunner:
> >> >> > 1. Create execution plan: Same in Standalone and Yarn
> >> >> > 2. Create intermediate streams: Same logic but different leader
> >> election
> >> >> > (ZK-based or pre-configured in standalone, AM in Yarn).
> >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
> >> >> >
> >> >> > JobRunner:
> >> >> > 1. Run the StreamProcessors: Same process in Standalone & Test.
> >> Remote
> >> >> host
> >> >> > in Yarn.
> >> >> >
> >> >> > To get a single ApplicationRunner implementation, like Jake
> >> suggested,
> >> >> we
> >> >> > need to make leader election and JobRunner implementation
> pluggable.
> >> >> > There's still the question of whether ApplicationRunner#run API
> >> should
> >> >> be
> >> >> > blocking or non-blocking. It has to be non-blocking in YARN. We
> want
> >> it
> >> >> to
> >> >> > be blocking in standalone, but seems like the main reason is ease
> of
> >> use
> >> >> > when launched from main(). I'd prefer making it consitently
> >> non-blocking
> >> >> > instead, esp. since in embedded standalone mode (where the
> processor
> >> is
> >> >> > running in another container) a blocking API would not be
> >> user-friendly
> >> >> > either. If not, we can add both run and runBlocking.
> >> >> >
> >> >> > Coming to RuntimeEnvironment, which is the least clear to me so
> far:
> >> >> > 1. I don't think RuntimeEnvironment should be responsible for
> >> providing
> >> >> > StreamSpecs for streamIds - they can be obtained with a config/util
> >> >> class.
> >> >> > The StreamProcessor should only know about logical streamIds and
> the
> >> >> > streamId <-> actual stream mapping should happen within the
> >> >> > SystemProducer/Consumer/Admins provided by the RuntimeEnvironment.
> >> >> > 2. There's also other components that the user might be interested
> in
> >> >> > providing implementations of in embedded Standalone mode (i.e., not
> >> >> just in
> >> >> > tests) - MetricsRegistry and JMXServer come to mind.
> >> >> > 3. Most importantly, it's not clear to me who creates and manages
> the
> >> >> > RuntimeEnvironment. It seems like it should be the
> ApplicationRunner
> >> or
> >> >> the
> >> >> > user because of (2) above and because StreamManager also needs
> >> access to
> >> >> > SystemAdmins for creating intermediate streams which users might
> >> want to
> >> >> > mock. But it also needs to be passed down to the StreamProcessor -
> >> how
> >> >> > would this work on Yarn?
> >> >> >
> >> >> > I think we should figure out how to integrate RuntimeEnvironment
> with
> >> >> > ApplicationRunner before we can make a call on one vs. multiple
> >> >> > ApplicationRunner implementations. If we do keep
> >> LocalApplicationRunner
> >> >> and
> >> >> > RemoteApplication (and TestApplicationRunner) separate, agree with
> >> Jake
> >> >> > that we should remove the JobRunners and roll them up into the
> >> >> respective
> >> >> > ApplicationRunners.
> >> >> >
> >> >> > - Prateek
> >> >> >
> >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes <jacob.m...@gmail.com
> >
> >> >> wrote:
> >> >> >
> >> >> > > Thanks for the SEP!
> >> >> > >
> >> >> > > +1 on introducing these new components
> >> >> > > -1 on the current definition of their roles (see Design feedback
> >> >> below)
> >> >> > >
> >> >> > > *Design*
> >> >> > >
> >> >> > >    - If LocalJobRunner and RemoteJobRunner handle the different
> >> >> methods
> >> >> > of
> >> >> > >    launching a Job, what additional value do the different types
> of
> >> >> > >    ApplicationRunner and RuntimeEnvironment provide? It seems
> like
> >> a
> >> >> red
> >> >> > > flag
> >> >> > >    that all 3 would need to change from environment to
> >> environment. It
> >> >> > >    indicates that they don't have proper modularity. The
> >> >> > > call-sequence-figures
> >> >> > >    support this; LocalApplicationRunner and
> RemoteApplicationRunner
> >> >> make
> >> >> > > the
> >> >> > >    same calls and the diagram only varies after jobRunner.start()
> >> >> > >    - As far as I can tell, the only difference between Local and
> >> >> Remote
> >> >> > >    ApplicationRunner is that one is blocking and the other is
> >> >> > > non-blocking. If
> >> >> > >    that's all they're for then either the names should be changed
> >> to
> >> >> > > reflect
> >> >> > >    this, or they should be combined into one ApplicationRunner
> and
> >> >> just
> >> >> > > expose
> >> >> > >    separate methods for run() and runBlocking()
> >> >> > >    - There isn't much detail on why the main() methods for
> >> >> Local/Remote
> >> >> > >    have such different implementations, how they receive the
> >> >> Application
> >> >> > >    (direct vs config), and concretely how the deployment scripts,
> >> if
> >> >> any,
> >> >> > >    should interact with them.
> >> >> > >
> >> >> > >
> >> >> > > *Style*
> >> >> > >
> >> >> > >    - nit: None of the 11 uses of the word "actual" in the doc are
> >> >> > > *actually*
> >> >> > >    needed. :-)
> >> >> > >    - nit: Colors of the runtime blocks in the diagrams are
> >> >> unconventional
> >> >> > >    and a little distracting. Reminds me of nai won bao. Now I'm
> >> >> hungry.
> >> >> > :-)
> >> >> > >    - Prefer the name "ExecutionEnvironment" over
> >> "RuntimeEnvironment".
> >> >> > The
> >> >> > >    term "execution environment" is used
> >> >> > >    - The code comparisons for the ApplicationRunners are not
> >> >> > apples-apples.
> >> >> > >    The local runner example is an application that USES the local
> >> >> runner.
> >> >> > > The
> >> >> > >    remote runner example is the just the runner code itself. So,
> >> it's
> >> >> not
> >> >> > >    readily apparent that we're comparing the main() methods and
> not
> >> >> the
> >> >> > >    application itself.
> >> >> > >
> >> >> > >
> >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <nickpa...@gmail.com>
> >> wrote:
> >> >> > >
> >> >> > > > Made some updates to clarify the role and functions of
> >> >> > RuntimeEnvironment
> >> >> > > > in SEP-2.
> >> >> > > >
> >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan <nickpa...@gmail.com>
> >> >> wrote:
> >> >> > > >
> >> >> > > > > Hi, everyone,
> >> >> > > > >
> >> >> > > > > In light of new features such as fluent API and standalone
> that
> >> >> > > introduce
> >> >> > > > > new deployment / application launch models in Samza, I
> created
> >> a
> >> >> new
> >> >> > > > SEP-2
> >> >> > > > > to address the new use cases. SEP-2 link:
> https://cwiki.apache
> >> .
> >> >> > > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+
> Design
> >> >> > > > >
> >> >> > > > > Please take a look and give feedbacks!
> >> >> > > > >
> >> >> > > > > Thanks!
> >> >> > > > >
> >> >> > > > > -Yi
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >> >
> >> >
> >>
> >
> >
>

Reply via email to