Thanks for the contribution Boris!! I've been playing around with the basic model for a while back and loved it. +1 and really looking forward to having the feature merging back to Flink ML.
-- Rong On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi everybody, > > The question of how to serve ML models in Flink applications came up in > several conversations I had with Flink users in the last months. > Recently, Boris approached me and he told me that he'd like to revive the > efforts around FLIP-23 [1]. > > In the last days, Boris extended the proposal by a speculative model > evaluation which allows for evaluating multiple modes of varying complexity > to ensure certain SLAs. > The code does already exist in a Github repository [2]. > > Due to the frequent user requests and the fact that the code is already > present, I think would be a great feature for Flink to have. > Since this is a library on top of Flink's existing APIs this should not be > too hard to review. > > What do others think? > > Best, Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving > [2] https://github.com/FlinkML/flink-speculative-modelServer > > Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < > st.kontopou...@gmail.com>: > > > Thanx @Fabian. I will update the document accordingly wrt metrics. > > I agree there are pros and cons. > > > > Best, > > Stavros > > > > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <fhue...@gmail.com> > wrote: > > > > > OK, I think there was plenty of time to comment on this FLIP. > > > I'll move it to the ACCEPTED status. > > > > > > @Stavros, please consider the feedback regarding the metrics. > > > I agree with Chesnay that metrics should be primarily exposed via the > > > metrics system. > > > Storing them in state makes them fault-tolerant and queryable if the > > state > > > is properly configured. > > > > > > Thanks, > > > Fabian > > > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <ches...@apache.org>: > > > > > > > I'm currently looking over it, but one thing that stood out was that > > the > > > > FLIP proposes to use queryable state > > > > as a monitoring solution. Given that we have a metric system that > > > > integrates with plenty of commonly used > > > > metric backends this doesn't really make sense to me. > > > > > > > > Storing them in state still has value in terms of fault-tolerance > > though, > > > > since this is something that the metric > > > > system doesn't provide by itself. > > > > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > > > >> Are there any more comments on the FLIP? > > > >> > > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] > and > > > >> continue with the implementation. > > > >> > > > >> Also, is there a committer who'd like to shepherd the FLIP and > review > > > the > > > >> corresponding PRs? > > > >> Of course, everybody is welcome to review the code but we need at > > least > > > >> one > > > >> committer who will eventually merge the changes. > > > >> > > > >> Best, > > > >> Fabian > > > >> > > > >> [1] > > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > > > >> Improvement+Proposals > > > >> > > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > > > >> > > > >> Hi, > > > >>> > > > >>> Sorry for the late follow up. > > > >>> > > > >>> I think I understand the motivation for choosing ProtoBuf as the > > > >>> representation and serialization format and this makes sense to me. > > > >>> > > > >>> However, it might be a good idea to provide tooling to convert > Flink > > > >>> types > > > >>> (described as TypeInformation) to ProtoBuf. > > > >>> Otherwise, users of the model serving library would need to > manually > > > >>> convert their data types (say Scala tuples, case classes, or Avro > > > Pojos) > > > >>> to > > > >>> ProtoBuf messages. > > > >>> I don't think that this needs to be included in the first version > but > > > it > > > >>> might be a good extension to make the library easier to use. > > > >>> > > > >>> Best, > > > >>> Fabian > > > >>> > > > >>> > > > >>> > > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > > >>> boris.lublin...@lightbend.com> > > > >>> : > > > >>> > > > >>> Thanks Fabian, > > > >>>> More below > > > >>>> > > > >>>> > > > >>>> > > > >>>> Boris Lublinsky > > > >>>> FDP Architect > > > >>>> boris.lublin...@lightbend.com > > > >>>> https://www.lightbend.com/ > > > >>>> > > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <fhue...@gmail.com> > > wrote: > > > >>>> > > > >>>> Hi Boris and Stavros, > > > >>>> > > > >>>> Thanks for the responses. > > > >>>> > > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this > > part > > > of > > > >>>> the proposal. > > > >>>> I interpreted the argument why to chose ProtoBuf for network > > encoding > > > >>>> ("ability > > > >>>> to represent different data types") such that different a model > > > pipeline > > > >>>> should work on different data types. > > > >>>> I agree that it should be possible to give records of the same > type > > > (but > > > >>>> with different keys) to different models. The key-based join > > approach > > > >>>> looks > > > >>>> good to me. > > > >>>> > > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > > models > > > >>>> for > > > >>>> the given reasons. > > > >>>> However, the choice of ProtoBuf serialization for the records > might > > > make > > > >>>> the integration with existing libraries and also regular > DataStream > > > >>>> programs more difficult. > > > >>>> They all use Flink's TypeSerializer system to serialize and > > > deserialize > > > >>>> records by default. Hence, we would need to add a conversion step > > > before > > > >>>> records can be passed to a model serving operator. > > > >>>> Are you expecting some common format that all records follow (such > > as > > > a > > > >>>> Row or Vector type) or do you plan to support arbitrary records > such > > > as > > > >>>> Pojos? > > > >>>> If you plan for a specific type, you could add a TypeInformation > for > > > >>>> this > > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > > >>>> > > > >>>> The way I look at it is slightly different. The common format for > > > >>>> records, supported by Flink, is Byte array with a little bit of > > > header, > > > >>>> describing data type and is used for routing. The actual > > unmarshalling > > > >>>> is > > > >>>> done by the model implementation itself. This provides the maximum > > > >>>> flexibility and gives user the freedom to create his own types > > without > > > >>>> breaking underlying framework. > > > >>>> > > > >>>> Ad 4) @Boris: I made this point not about the serialization format > > but > > > >>>> how the library would integrate with Flink's DataStream API. > > > >>>> I thought I had seen a code snippet that showed a new method on > the > > > >>>> DataStream object but cannot find this anymore. > > > >>>> So, I just wanted to make the point that we should not change the > > > >>>> DataStream API (unless it lacks support for some features) and > built > > > the > > > >>>> model serving library on top of it. > > > >>>> But I get from Stavros answer that this is your design anyway. > > > >>>> > > > >>>> Ad 5) The metrics system is the default way to expose system and > job > > > >>>> metrics in Flink. Due to the pluggable reporter interface and > > various > > > >>>> reporters, they can be easily integrated in many production > > > >>>> environments. > > > >>>> A solution based on queryable state will always need custom code > to > > > >>>> access the information. Of course this can be an optional feature. > > > >>>> > > > >>>> What do others think about this proposal? > > > >>>> > > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but > you > > > are > > > >>>> the first one outside of it. My book https://www.lightbend.com > > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > > ebook-from-lightbend > > > >>>> has > > > >>>> > > > >>>> a reasonably good reviews, so we are hoping this will work > > > >>>> > > > >>>> > > > >>>> Best, Fabian > > > >>>> > > > >>>> > > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > > >>>> st.kontopou...@gmail.com> > > > >>>> : > > > >>>> > > > >>>> Hi Fabian thanx! > > > >>>>> > > > >>>>> > > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > > handle > > > >>>>>> different input types? > > > >>>>>> I understand that it makes sense to have different models for > > > >>>>>> different > > > >>>>>> instances of the same type, i.e., same data type but different > > keys. > > > >>>>>> > > > >>>>> Hence, > > > >>>>> > > > >>>>>> the key-based joins make sense to me. However, couldn't > completely > > > >>>>>> different types be handled by different ML pipelines or would > > there > > > be > > > >>>>>> major drawbacks? > > > >>>>>> > > > >>>>> > > > >>>>> Could you elaborate more on this? Right now we only use keys when > > we > > > do > > > >>>>> the > > > >>>>> join. A given pipeline can handle only a well defined type (the > > type > > > >>>>> can > > > >>>>> be > > > >>>>> a simple string with a custom value, no need to be a > > > >>>>> class type) which serves as a key. > > > >>>>> > > > >>>>> 2) > > > >>>>> > > > >>>>> I think from an API point of view it would be better to not > require > > > >>>>> > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > model > > > >>>>>> > > > >>>>> server > > > >>>>> > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > necessary) > > > >>>>>> > > > >>>>> convert > > > >>>>> > > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > > >>>>>> > > > >>>>> different > > > >>>>> > > > >>>>>> types of records (see my first point), we can introduce a Union > > type > > > >>>>>> > > > >>>>> (i.e., > > > >>>>> > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > encoding > > > >>>>>> format for the models but maybe also this can be designed to be > > > >>>>>> > > > >>>>> pluggable > > > >>>>> > > > >>>>>> such that later other encodings can be added. > > > >>>>>> > > > >>>>>> We do uses scala classes (strongly typed classes), protobuf is > > > only > > > >>>>> used > > > >>>>> on the wire. For on the wire encoding we prefer protobufs for > size, > > > >>>>> expressiveness and ability to represent different data types. > > > >>>>> > > > >>>>> 3) > > > >>>>> > > > >>>>> I think the DataStream Java API should be supported as a first > > class > > > >>>>> > > > >>>>>> citizens for this library. > > > >>>>>> > > > >>>>> > > > >>>>> I agree. It should be either first priority or a next thing to > do. > > > >>>>> > > > >>>>> > > > >>>>> 4) > > > >>>>> > > > >>>>> For the integration with the DataStream API, we could provide an > > API > > > >>>>> that > > > >>>>> > > > >>>>>> receives (typed) DataStream objects, internally constructs the > > > >>>>>> > > > >>>>> DataStream > > > >>>>> > > > >>>>>> operators, and returns one (or more) result DataStreams. The > > benefit > > > >>>>>> is > > > >>>>>> that we don't need to change the DataStream API directly, but > put > > a > > > >>>>>> > > > >>>>> library > > > >>>>> > > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > > approach. > > > >>>>>> > > > >>>>> > > > >>>>> We will provide a DSL which will do jsut this. But even without > > the > > > >>>>> DSL > > > >>>>> this is what we do with low level joins. > > > >>>>> > > > >>>>> > > > >>>>> 5) > > > >>>>> > > > >>>>> I'm skeptical about using queryable state to expose metrics. Did > > you > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > configurable > > > >>>>>> > > > >>>>> and we > > > >>>>> > > > >>>>>> provided several reporters that export the metrics. > > > >>>>>> > > > >>>>>> This of course is an option. The choice of queryable state was > > > mostly > > > >>>>> driven by a simplicity of real time integration. Any reason why > > > >>>>> metrics > > > >>>>> system is netter? > > > >>>>> > > > >>>>> > > > >>>>> Best, > > > >>>>> Stavros > > > >>>>> > > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske < > fhue...@gmail.com> > > > >>>>> wrote: > > > >>>>> > > > >>>>> Hi Stavros, > > > >>>>>> > > > >>>>>> thanks for the detailed FLIP! > > > >>>>>> Model serving is an important use case and it's great to see > > efforts > > > >>>>>> > > > >>>>> to add > > > >>>>> > > > >>>>>> a library for this to Flink! > > > >>>>>> > > > >>>>>> I've read the FLIP and would like to ask a few questions and > make > > > some > > > >>>>>> suggestions. > > > >>>>>> > > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > > >>>>>> handle > > > >>>>>> different input types? > > > >>>>>> I understand that it makes sense to have different models for > > > >>>>>> different > > > >>>>>> instances of the same type, i.e., same data type but different > > keys. > > > >>>>>> > > > >>>>> Hence, > > > >>>>> > > > >>>>>> the key-based joins make sense to me. However, couldn't > completely > > > >>>>>> different types be handled by different ML pipelines or would > > there > > > be > > > >>>>>> major drawbacks? > > > >>>>>> > > > >>>>>> 2) I think from an API point of view it would be better to not > > > require > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > model > > > >>>>>> > > > >>>>> server > > > >>>>> > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > necessary) > > > >>>>>> > > > >>>>> convert > > > >>>>> > > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > > >>>>>> > > > >>>>> different > > > >>>>> > > > >>>>>> types of records (see my first point), we can introduce a Union > > type > > > >>>>>> > > > >>>>> (i.e., > > > >>>>> > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > encoding > > > >>>>>> format for the models but maybe also this can be designed to be > > > >>>>>> > > > >>>>> pluggable > > > >>>>> > > > >>>>>> such that later other encodings can be added. > > > >>>>>> > > > >>>>>> 3) I think the DataStream Java API should be supported as a > first > > > >>>>>> class > > > >>>>>> citizens for this library. > > > >>>>>> > > > >>>>>> 4) For the integration with the DataStream API, we could provide > > an > > > >>>>>> API > > > >>>>>> that receives (typed) DataStream objects, internally constructs > > the > > > >>>>>> DataStream operators, and returns one (or more) result > > DataStreams. > > > >>>>>> The > > > >>>>>> benefit is that we don't need to change the DataStream API > > directly, > > > >>>>>> > > > >>>>> but > > > >>>>> > > > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) > > follow > > > >>>>>> > > > >>>>> this > > > >>>>> > > > >>>>>> approach. > > > >>>>>> > > > >>>>>> 5) I'm skeptical about using queryable state to expose metrics. > > Did > > > >>>>>> you > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > configurable > > > >>>>>> > > > >>>>> and we > > > >>>>> > > > >>>>>> provided several reporters that export the metrics. > > > >>>>>> > > > >>>>>> What do you think? > > > >>>>>> Best, Fabian > > > >>>>>> > > > >>>>>> [1] > > > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > > >>>>>> > > > >>>>> monitoring/ > > > >>>>> > > > >>>>>> metrics.html > > > >>>>>> > > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > > >>>>>> > > > >>>>> st.kontopou...@gmail.com>: > > > >>>>> > > > >>>>>> Hi guys, > > > >>>>>>> > > > >>>>>>> Let's discuss the new FLIP proposal for model serving over > Flink. > > > The > > > >>>>>>> > > > >>>>>> idea > > > >>>>>> > > > >>>>>>> is to combine previous efforts there and provide a library on > top > > > of > > > >>>>>>> > > > >>>>>> Flink > > > >>>>>> > > > >>>>>>> for serving models. > > > >>>>>>> > > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > >>>>>>> > > > >>>>>> 23+-+Model+Serving > > > >>>>>> > > > >>>>>>> Code from previous efforts can be found here: > > > >>>>>>> > > > >>>>>> https://github.com/FlinkML > > > >>>>> > > > >>>>>> Best, > > > >>>>>>> Stavros > > > >>>>>>> > > > >>>>>>> > > > >>>> > > > >>>> > > > > > > > > > >