btw, I will get to SAMZA-1246 as soon as possible. Thanks, Xinyu
On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xinyuliu...@gmail.com> wrote: > Let me try to capture the updated requirements: > > 1. Set up input streams outside StreamGraph, and treat graph building as a > library (*Fluent, Beam*). > > 2. Improve ease of use for ApplicationRunner to avoid complex > configurations such as zkCoordinator, zkCoordinationService. (*Standalone*). > Provide some programmatic way to tweak them in the API. > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We can > have one or more implementations but it's hidden from the users. > > 4. Separate StreamGraph from runtime environment so it can be serialized > (*Beam, > Yarn*) > > 5. Better life cycle management of application, parity with > StreamProcessor (*Standalone, Beam*). Stats should include exception in > case of failure (tracked in SAMZA-1246). > > 6. Support injecting user-defined objects into ApplicationRunner. > > Prateek and I iterate on the ApplilcationRunner API based on these > requirements. To support #1, we can set up input streams on the runner > level, which returns the MessageStream and allows graph building > afterwards. The code looks like below: > > ApplicationRunner runner = ApplicationRunner.local(); > runner.input(streamSpec) > .map(..) > .window(...) > runner.run(); > > StreamSpec is the building block for setting up streams here. It can be > set up in different ways: > > - Direct creation of stream spec, like runner.input(new StreamSpec(id, > system, stream)) > - Load from streamId from env or config, like runner.input(runner.env(). > getStreamSpec(id)) > - Canned Spec which generates the StreamSpec with id, system and stream > to minimize the configuration. For example, CollectionSpec.create(new > ArrayList[]{1,2,3,4}), which will auto generate the system and stream in > the spec. > > To support #2, we need to be able to set up StreamSpec-related objects and > factories programmatically in env. Suppose we have the following before > runner.input(...): > > runner.setup(env /* a writable interface of env*/ -> { > env.setStreamSpec(streamId, streamSpec); > env.setSystem(systemName, systemFactory); > }) > > runner.setup(->) also provides setup for stores and other runtime stuff > needed for the execution. The setup should be able to serialized to config. > For #6, I haven't figured out a good way to inject user-defined objects > here yet. > > With this API, we should be able to also support #4. For remote > runner.run(), the operator user classes/lamdas in the StreamGraph need to > be serialized. As today, the existing option is to serialize to a stream, > either the coordinator stream or the pipeline control stream, which will > have the size limit per message. Do you see RPC as an option? > > For this version of API, seems we don't need the StreamApplication wrapper > as well as exposing the StreamGraph. Do you think we are on the right path? > > Thanks, > Xinyu > > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt < > cpett...@linkedin.com.invalid> wrote: > >> That should have been: >> >> For #1, Beam doesn't have a hard requirement... >> >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <cpett...@linkedin.com> >> wrote: >> >> > For #1, I doesn't have a hard requirement for any change from Samza. A >> > very nice to have would be to allow the input systems to be set up at >> the >> > same time as the rest of the StreamGraph. An even nicer to have would >> be to >> > do away with the callback based approach and treat graph building as a >> > library, a la Beam and Flink. >> > >> > For the moment I've worked around the two pass requirement (once for >> > config, once for StreamGraph) by introducing an IR layer between Beam >> and >> > the Samza Fluent translation. The IR layer is convenient independent of >> > this problem because it makes it easier to switch between the Fluent and >> > low-level APIs. >> > >> > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd be in >> > great shape. One additional issue with the status call that I may not >> have >> > mentioned is that it provides you no way to get at the cause of failure. >> > The StreamProcessor API does allow this via the callback. >> > >> > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra configuration >> > indirection you currently have to jump through (this is also related to >> > system consumer configuration from #1. It makes it much easier to >> discover >> > what the configurable parameters are too, if we provide some >> programmatic >> > way to tweak them in the API - which can turn into config under the >> hood. >> > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xinyuliu...@gmail.com> >> wrote: >> > >> >> Let me give a shot to summarize the requirements for ApplicationRunner >> we >> >> have discussed so far: >> >> >> >> - Support environment for passing in user-defined objects (streams >> >> potentially) into ApplicationRunner (*Beam*) >> >> >> >> - Improve ease of use for ApplicationRunner to avoid complex >> >> configurations >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*) >> >> >> >> - Clean up ApplicationRunner into a single interface (*Fluent*). We can >> >> have one or more implementations but it's hidden from the users. >> >> >> >> - Separate StreamGraph from environment so it can be serializable >> (*Beam, >> >> Yarn*) >> >> >> >> - Better life cycle management of application, including >> >> start/stop/stats (*Standalone, >> >> Beam*) >> >> >> >> >> >> One way to address 2 and 3 is to provide pre-packaged runner using >> static >> >> factory methods, and the return type will be the ApplicationRunner >> >> interface. So we can have: >> >> >> >> ApplicationRunner runner = ApplicationRunner.zk() / >> >> ApplicationRunner.local() >> >> / ApplicationRunner.remote() / ApplicationRunner.test(). >> >> >> >> Internally we will package the right configs and run-time environment >> with >> >> the runner. For example, ApplicationRunner.zk() will define all the >> >> configs >> >> needed for zk coordination. >> >> >> >> To support 1 and 4, can we pass in a lambda function in the runner, and >> >> then we can run the stream graph? Like the following: >> >> >> >> ApplicationRunner.zk().env(config -> environment).run(streamGraph); >> >> >> >> Then we need a way to pass the environment into the StreamGraph. This >> can >> >> be done by either adding an extra parameter to each operator, or have a >> >> getEnv() function in the MessageStream, which seems to be pretty hacky. >> >> >> >> What do you think? >> >> >> >> Thanks, >> >> Xinyu >> >> >> >> >> >> >> >> >> >> >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari < >> >> pmaheshw...@linkedin.com.invalid> wrote: >> >> >> >> > Thanks for putting this together Yi! >> >> > >> >> > I agree with Jake, it does seem like there are a few too many moving >> >> parts >> >> > here. That said, the problem being solved is pretty broad, so let me >> >> try to >> >> > summarize my current understanding of the requirements. Please >> correct >> >> me >> >> > if I'm wrong or missing something. >> >> > >> >> > ApplicationRunner and JobRunner first, ignoring test environment for >> the >> >> > moment. >> >> > ApplicationRunner: >> >> > 1. Create execution plan: Same in Standalone and Yarn >> >> > 2. Create intermediate streams: Same logic but different leader >> election >> >> > (ZK-based or pre-configured in standalone, AM in Yarn). >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in Yarn. >> >> > >> >> > JobRunner: >> >> > 1. Run the StreamProcessors: Same process in Standalone & Test. >> Remote >> >> host >> >> > in Yarn. >> >> > >> >> > To get a single ApplicationRunner implementation, like Jake >> suggested, >> >> we >> >> > need to make leader election and JobRunner implementation pluggable. >> >> > There's still the question of whether ApplicationRunner#run API >> should >> >> be >> >> > blocking or non-blocking. It has to be non-blocking in YARN. We want >> it >> >> to >> >> > be blocking in standalone, but seems like the main reason is ease of >> use >> >> > when launched from main(). I'd prefer making it consitently >> non-blocking >> >> > instead, esp. since in embedded standalone mode (where the processor >> is >> >> > running in another container) a blocking API would not be >> user-friendly >> >> > either. If not, we can add both run and runBlocking. >> >> > >> >> > Coming to RuntimeEnvironment, which is the least clear to me so far: >> >> > 1. I don't think RuntimeEnvironment should be responsible for >> providing >> >> > StreamSpecs for streamIds - they can be obtained with a config/util >> >> class. >> >> > The StreamProcessor should only know about logical streamIds and the >> >> > streamId <-> actual stream mapping should happen within the >> >> > SystemProducer/Consumer/Admins provided by the RuntimeEnvironment. >> >> > 2. There's also other components that the user might be interested in >> >> > providing implementations of in embedded Standalone mode (i.e., not >> >> just in >> >> > tests) - MetricsRegistry and JMXServer come to mind. >> >> > 3. Most importantly, it's not clear to me who creates and manages the >> >> > RuntimeEnvironment. It seems like it should be the ApplicationRunner >> or >> >> the >> >> > user because of (2) above and because StreamManager also needs >> access to >> >> > SystemAdmins for creating intermediate streams which users might >> want to >> >> > mock. But it also needs to be passed down to the StreamProcessor - >> how >> >> > would this work on Yarn? >> >> > >> >> > I think we should figure out how to integrate RuntimeEnvironment with >> >> > ApplicationRunner before we can make a call on one vs. multiple >> >> > ApplicationRunner implementations. If we do keep >> LocalApplicationRunner >> >> and >> >> > RemoteApplication (and TestApplicationRunner) separate, agree with >> Jake >> >> > that we should remove the JobRunners and roll them up into the >> >> respective >> >> > ApplicationRunners. >> >> > >> >> > - Prateek >> >> > >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes <jacob.m...@gmail.com> >> >> wrote: >> >> > >> >> > > Thanks for the SEP! >> >> > > >> >> > > +1 on introducing these new components >> >> > > -1 on the current definition of their roles (see Design feedback >> >> below) >> >> > > >> >> > > *Design* >> >> > > >> >> > > - If LocalJobRunner and RemoteJobRunner handle the different >> >> methods >> >> > of >> >> > > launching a Job, what additional value do the different types of >> >> > > ApplicationRunner and RuntimeEnvironment provide? It seems like >> a >> >> red >> >> > > flag >> >> > > that all 3 would need to change from environment to >> environment. It >> >> > > indicates that they don't have proper modularity. The >> >> > > call-sequence-figures >> >> > > support this; LocalApplicationRunner and RemoteApplicationRunner >> >> make >> >> > > the >> >> > > same calls and the diagram only varies after jobRunner.start() >> >> > > - As far as I can tell, the only difference between Local and >> >> Remote >> >> > > ApplicationRunner is that one is blocking and the other is >> >> > > non-blocking. If >> >> > > that's all they're for then either the names should be changed >> to >> >> > > reflect >> >> > > this, or they should be combined into one ApplicationRunner and >> >> just >> >> > > expose >> >> > > separate methods for run() and runBlocking() >> >> > > - There isn't much detail on why the main() methods for >> >> Local/Remote >> >> > > have such different implementations, how they receive the >> >> Application >> >> > > (direct vs config), and concretely how the deployment scripts, >> if >> >> any, >> >> > > should interact with them. >> >> > > >> >> > > >> >> > > *Style* >> >> > > >> >> > > - nit: None of the 11 uses of the word "actual" in the doc are >> >> > > *actually* >> >> > > needed. :-) >> >> > > - nit: Colors of the runtime blocks in the diagrams are >> >> unconventional >> >> > > and a little distracting. Reminds me of nai won bao. Now I'm >> >> hungry. >> >> > :-) >> >> > > - Prefer the name "ExecutionEnvironment" over >> "RuntimeEnvironment". >> >> > The >> >> > > term "execution environment" is used >> >> > > - The code comparisons for the ApplicationRunners are not >> >> > apples-apples. >> >> > > The local runner example is an application that USES the local >> >> runner. >> >> > > The >> >> > > remote runner example is the just the runner code itself. So, >> it's >> >> not >> >> > > readily apparent that we're comparing the main() methods and not >> >> the >> >> > > application itself. >> >> > > >> >> > > >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <nickpa...@gmail.com> >> wrote: >> >> > > >> >> > > > Made some updates to clarify the role and functions of >> >> > RuntimeEnvironment >> >> > > > in SEP-2. >> >> > > > >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan <nickpa...@gmail.com> >> >> wrote: >> >> > > > >> >> > > > > Hi, everyone, >> >> > > > > >> >> > > > > In light of new features such as fluent API and standalone that >> >> > > introduce >> >> > > > > new deployment / application launch models in Samza, I created >> a >> >> new >> >> > > > SEP-2 >> >> > > > > to address the new use cases. SEP-2 link: https://cwiki.apache >> . >> >> > > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+Design >> >> > > > > >> >> > > > > Please take a look and give feedbacks! >> >> > > > > >> >> > > > > Thanks! >> >> > > > > >> >> > > > > -Yi >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >> > >> > >> > >