btw, I will get to SAMZA-1246 as soon as possible.

Thanks,
Xinyu

On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com> wrote:

> Let me try to capture the updated requirements:
>
> 1. Set up input streams outside StreamGraph, and treat graph building as a
> library (*Fluent, Beam*).
>
> 2. Improve ease of use for ApplicationRunner to avoid complex
> configurations such as zkCoordinator, zkCoordinationService. (*Standalone*).
> Provide some programmatic way to tweak them in the API.
>
> 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can
> have one or more implementations but it's hidden from the users.
>
> 4. Separate StreamGraph from runtime environment so it can be serialized 
> (*Beam,
> Yarn*)
>
> 5. Better life cycle management of application, parity with
> StreamProcessor (*Standalone, Beam*). Stats should include exception in
> case of failure (tracked in SAMZA-1246).
>
> 6. Support injecting user-defined objects into ApplicationRunner.
>
> Prateek and I iterate on the ApplilcationRunner API based on these
> requirements. To support #1, we can set up input streams on the runner
> level, which returns the MessageStream and allows graph building
> afterwards. The code looks like below:
>
>   ApplicationRunner runner = ApplicationRunner.local();
>   runner.input(streamSpec)
>             .map(..)
>             .window(...)
>   runner.run();
>
> StreamSpec is the building block for setting up streams here. It can be
> set up in different ways:
>
>   - Direct creation of stream spec, like runner.input(new StreamSpec(id,
> system, stream))
>   - Load from streamId from env or config, like runner.input(runner.env().
> getStreamSpec(id))
>   - Canned Spec which generates the StreamSpec with id, system and stream
> to minimize the configuration. For example, CollectionSpec.create(new
> ArrayList[]{1,2,3,4}), which will auto generate the system and stream in
> the spec.
>
> To support #2, we need to be able to set up StreamSpec-related objects and
> factories programmatically in env. Suppose we have the following before
> runner.input(...):
>
>   runner.setup(env /* a writable interface of env*/ -> {
>     env.setStreamSpec(streamId, streamSpec);
>     env.setSystem(systemName, systemFactory);
>   })
>
> runner.setup(->) also provides setup for stores and other runtime stuff
> needed for the execution. The setup should be able to serialized to config.
> For #6, I haven't figured out a good way to inject user-defined objects
> here yet.
>
> With this API, we should be able to also support #4. For remote
> runner.run(), the operator user classes/lamdas in the StreamGraph need to
> be serialized. As today, the existing option is to serialize to a stream,
> either the coordinator stream or the pipeline control stream, which will
> have the size limit per message. Do you see RPC as an option?
>
> For this version of API, seems we don't need the StreamApplication wrapper
> as well as exposing the StreamGraph. Do you think we are on the right path?
>
> Thanks,
> Xinyu
>
>
> On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> cpett...@linkedin.com.invalid> wrote:
>
>> That should have been:
>>
>> For #1, Beam doesn't have a hard requirement...
>>
>> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <cpett...@linkedin.com>
>> wrote:
>>
>> > For #1, I doesn't have a hard requirement for any change from Samza. A
>> > very nice to have would be to allow the input systems to be set up at
>> the
>> > same time as the rest of the StreamGraph. An even nicer to have would
>> be to
>> > do away with the callback based approach and treat graph building as a
>> > library, a la Beam and Flink.
>> >
>> > For the moment I've worked around the two pass requirement (once for
>> > config, once for StreamGraph) by introducing an IR layer between Beam
>> and
>> > the Samza Fluent translation. The IR layer is convenient independent of
>> > this problem because it makes it easier to switch between the Fluent and
>> > low-level APIs.
>> >
>> >
>> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in
>> > great shape. One additional issue with the status call that I may not
>> have
>> > mentioned is that it provides you no way to get at the cause of failure.
>> > The StreamProcessor API does allow this via the callback.
>> >
>> >
>> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration
>> > indirection you currently have to jump through (this is also related to
>> > system consumer configuration from #1. It makes it much easier to
>> discover
>> > what the configurable parameters are too, if we provide some
>> programmatic
>> > way to tweak them in the API - which can turn into config under the
>> hood.
>> >
>> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xinyuliu...@gmail.com>
>> wrote:
>> >
>> >> Let me give a shot to summarize the requirements for ApplicationRunner
>> we
>> >> have discussed so far:
>> >>
>> >> - Support environment for passing in user-defined objects (streams
>> >> potentially) into ApplicationRunner (*Beam*)
>> >>
>> >> - Improve ease of use for ApplicationRunner to avoid complex
>> >> configurations
>> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>> >>
>> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We can
>> >> have one or more implementations but it's hidden from the users.
>> >>
>> >> - Separate StreamGraph from environment so it can be serializable
>> (*Beam,
>> >> Yarn*)
>> >>
>> >> - Better life cycle management of application, including
>> >> start/stop/stats (*Standalone,
>> >> Beam*)
>> >>
>> >>
>> >> One way to address 2 and 3 is to provide pre-packaged runner using
>> static
>> >> factory methods, and the return type will be the ApplicationRunner
>> >> interface. So we can have:
>> >>
>> >>   ApplicationRunner runner = ApplicationRunner.zk() /
>> >> ApplicationRunner.local()
>> >> / ApplicationRunner.remote() / ApplicationRunner.test().
>> >>
>> >> Internally we will package the right configs and run-time environment
>> with
>> >> the runner. For example, ApplicationRunner.zk() will define all the
>> >> configs
>> >> needed for zk coordination.
>> >>
>> >> To support 1 and 4, can we pass in a lambda function in the runner, and
>> >> then we can run the stream graph? Like the following:
>> >>
>> >>   ApplicationRunner.zk().env(config -> environment).run(streamGraph);
>> >>
>> >> Then we need a way to pass the environment into the StreamGraph. This
>> can
>> >> be done by either adding an extra parameter to each operator, or have a
>> >> getEnv() function in the MessageStream, which seems to be pretty hacky.
>> >>
>> >> What do you think?
>> >>
>> >> Thanks,
>> >> Xinyu
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
>> >> pmaheshw...@linkedin.com.invalid> wrote:
>> >>
>> >> > Thanks for putting this together Yi!
>> >> >
>> >> > I agree with Jake, it does seem like there are a few too many moving
>> >> parts
>> >> > here. That said, the problem being solved is pretty broad, so let me
>> >> try to
>> >> > summarize my current understanding of the requirements. Please
>> correct
>> >> me
>> >> > if I'm wrong or missing something.
>> >> >
>> >> > ApplicationRunner and JobRunner first, ignoring test environment for
>> the
>> >> > moment.
>> >> > ApplicationRunner:
>> >> > 1. Create execution plan: Same in Standalone and Yarn
>> >> > 2. Create intermediate streams: Same logic but different leader
>> election
>> >> > (ZK-based or pre-configured in standalone, AM in Yarn).
>> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn.
>> >> >
>> >> > JobRunner:
>> >> > 1. Run the StreamProcessors: Same process in Standalone & Test.
>> Remote
>> >> host
>> >> > in Yarn.
>> >> >
>> >> > To get a single ApplicationRunner implementation, like Jake
>> suggested,
>> >> we
>> >> > need to make leader election and JobRunner implementation pluggable.
>> >> > There's still the question of whether ApplicationRunner#run API
>> should
>> >> be
>> >> > blocking or non-blocking. It has to be non-blocking in YARN. We want
>> it
>> >> to
>> >> > be blocking in standalone, but seems like the main reason is ease of
>> use
>> >> > when launched from main(). I'd prefer making it consitently
>> non-blocking
>> >> > instead, esp. since in embedded standalone mode (where the processor
>> is
>> >> > running in another container) a blocking API would not be
>> user-friendly
>> >> > either. If not, we can add both run and runBlocking.
>> >> >
>> >> > Coming to RuntimeEnvironment, which is the least clear to me so far:
>> >> > 1. I don't think RuntimeEnvironment should be responsible for
>> providing
>> >> > StreamSpecs for streamIds - they can be obtained with a config/util
>> >> class.
>> >> > The StreamProcessor should only know about logical streamIds and the
>> >> > streamId <-> actual stream mapping should happen within the
>> >> > SystemProducer/Consumer/Admins provided by the RuntimeEnvironment.
>> >> > 2. There's also other components that the user might be interested in
>> >> > providing implementations of in embedded Standalone mode (i.e., not
>> >> just in
>> >> > tests) - MetricsRegistry and JMXServer come to mind.
>> >> > 3. Most importantly, it's not clear to me who creates and manages the
>> >> > RuntimeEnvironment. It seems like it should be the ApplicationRunner
>> or
>> >> the
>> >> > user because of (2) above and because StreamManager also needs
>> access to
>> >> > SystemAdmins for creating intermediate streams which users might
>> want to
>> >> > mock. But it also needs to be passed down to the StreamProcessor -
>> how
>> >> > would this work on Yarn?
>> >> >
>> >> > I think we should figure out how to integrate RuntimeEnvironment with
>> >> > ApplicationRunner before we can make a call on one vs. multiple
>> >> > ApplicationRunner implementations. If we do keep
>> LocalApplicationRunner
>> >> and
>> >> > RemoteApplication (and TestApplicationRunner) separate, agree with
>> Jake
>> >> > that we should remove the JobRunners and roll them up into the
>> >> respective
>> >> > ApplicationRunners.
>> >> >
>> >> > - Prateek
>> >> >
>> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes <jacob.m...@gmail.com>
>> >> wrote:
>> >> >
>> >> > > Thanks for the SEP!
>> >> > >
>> >> > > +1 on introducing these new components
>> >> > > -1 on the current definition of their roles (see Design feedback
>> >> below)
>> >> > >
>> >> > > *Design*
>> >> > >
>> >> > >    - If LocalJobRunner and RemoteJobRunner handle the different
>> >> methods
>> >> > of
>> >> > >    launching a Job, what additional value do the different types of
>> >> > >    ApplicationRunner and RuntimeEnvironment provide? It seems like
>> a
>> >> red
>> >> > > flag
>> >> > >    that all 3 would need to change from environment to
>> environment. It
>> >> > >    indicates that they don't have proper modularity. The
>> >> > > call-sequence-figures
>> >> > >    support this; LocalApplicationRunner and RemoteApplicationRunner
>> >> make
>> >> > > the
>> >> > >    same calls and the diagram only varies after jobRunner.start()
>> >> > >    - As far as I can tell, the only difference between Local and
>> >> Remote
>> >> > >    ApplicationRunner is that one is blocking and the other is
>> >> > > non-blocking. If
>> >> > >    that's all they're for then either the names should be changed
>> to
>> >> > > reflect
>> >> > >    this, or they should be combined into one ApplicationRunner and
>> >> just
>> >> > > expose
>> >> > >    separate methods for run() and runBlocking()
>> >> > >    - There isn't much detail on why the main() methods for
>> >> Local/Remote
>> >> > >    have such different implementations, how they receive the
>> >> Application
>> >> > >    (direct vs config), and concretely how the deployment scripts,
>> if
>> >> any,
>> >> > >    should interact with them.
>> >> > >
>> >> > >
>> >> > > *Style*
>> >> > >
>> >> > >    - nit: None of the 11 uses of the word "actual" in the doc are
>> >> > > *actually*
>> >> > >    needed. :-)
>> >> > >    - nit: Colors of the runtime blocks in the diagrams are
>> >> unconventional
>> >> > >    and a little distracting. Reminds me of nai won bao. Now I'm
>> >> hungry.
>> >> > :-)
>> >> > >    - Prefer the name "ExecutionEnvironment" over
>> "RuntimeEnvironment".
>> >> > The
>> >> > >    term "execution environment" is used
>> >> > >    - The code comparisons for the ApplicationRunners are not
>> >> > apples-apples.
>> >> > >    The local runner example is an application that USES the local
>> >> runner.
>> >> > > The
>> >> > >    remote runner example is the just the runner code itself. So,
>> it's
>> >> not
>> >> > >    readily apparent that we're comparing the main() methods and not
>> >> the
>> >> > >    application itself.
>> >> > >
>> >> > >
>> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <nickpa...@gmail.com>
>> wrote:
>> >> > >
>> >> > > > Made some updates to clarify the role and functions of
>> >> > RuntimeEnvironment
>> >> > > > in SEP-2.
>> >> > > >
>> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan <nickpa...@gmail.com>
>> >> wrote:
>> >> > > >
>> >> > > > > Hi, everyone,
>> >> > > > >
>> >> > > > > In light of new features such as fluent API and standalone that
>> >> > > introduce
>> >> > > > > new deployment / application launch models in Samza, I created
>> a
>> >> new
>> >> > > > SEP-2
>> >> > > > > to address the new use cases. SEP-2 link: https://cwiki.apache
>> .
>> >> > > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design
>> >> > > > >
>> >> > > > > Please take a look and give feedbacks!
>> >> > > > >
>> >> > > > > Thanks!
>> >> > > > >
>> >> > > > > -Yi
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>> >
>>
>
>

Reply via email to