1. Re StreamsConfig.USER_ENDPOINT_CONFIG:

I agree with Neha that Kafka Streams can provide the bare minimum APIs just
for host/port, and user's implemented layer can provide URL / proxy address
they want to build on top of it.


2. Re Improving KafkaStreamsInstance interface:

Users are indeed aware of "TaskId" class which is not part of internal
packages and is exposed in PartitionGrouper interface that can be
instantiated by the users, which is assigned with input topic partitions.
So we can probably change the APIs as:

Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks() where
TaskMetadata has fields such as taskId, list of assigned partitions, list
of state store names; and HostState can include hostname / port. The port
is the listening port of a user-defined listener that users provide to
listen for queries (e.g., using REST APIs).

Map<HostState, Set<TaskMetadata>> KafkaStreams.getTasksWithStore(String /*
storeName */) would return only the hosts and their assigned tasks if at
least one of the tasks include the given store name.

Map<HostState, Set<TaskMetadata>> KafkaStreams.getTaskWithStoreAndKey(Key
k, String /* storeName */, StreamPartitioner partitioner) would return only
the host and their assigned task if the store with the store name has a
particular key, according to the partitioner behavior.



Guozhang


On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede <n...@confluent.io> wrote:

> Few thoughts that became apparent after observing example code of what an
> application architecture and code might look like with these changes.
> Apologize for the late realization hence.
>
> 1. "user.endpoint" will be very differently defined for respective
> applications. I don't think Kafka Streams should generalize to accept any
> connection URL as we expect to only expose metadata expressed as HostInfo
> (which is defined by host & port) and hence need to interpret the
> "user.endpoint" as host & port. Applications will have their own endpoint
> configs that will take many forms and they will be responsible for parsing
> out host and port and configuring Kafka Streams accordingly.
>
> If we are in fact limiting to host and port, I wonder if we should change
> the name of "user.endpoint" into something more specific. We have clients
> expose host/port pairs as "bootstrap.servers". Should this be
> "application.server"?
>
> 2. I don't think we should expose another abstraction called
> KafkaStreamsInstance to the user. This is related to the discussion of the
> right abstraction that we want to expose to an application. The abstraction
> discussion itself should probably be part of the KIP itself, let me give a
> quick summary of my thoughts here:
> 1. The person implementing an application using Queryable State has likely
> already made some choices for the service layer–a REST framework, Thrift,
> or whatever. We don't really want to add another RPC framework to this mix,
> nor do we want to try to make Kafka's RPC mechanism general purpose.
> 2. Likewise, it should be clear that the API you want to expose to the
> front-end/client service is not necessarily the API you'd need internally
> as there may be additional filtering/processing in the router.
>
> Given these constraints, what we prefer to add is a fairly low-level
> "toolbox" that would let you do anything you want, but requires to route
> and perform any aggregation or processing yourself. This pattern is
> not recommended for all kinds of services/apps, but there are definitely a
> category of things where it is a big win and other advanced applications
> are out-of-scope.
>
> The APIs we expose should take the following things into consideration:
> 1. Make it clear to the user that they will do the routing, aggregation,
> processing themselves. So the bare minimum that we want to expose is store
> and partition metadata per application server identified by the host and
> port.
> 2. Ensure that the API exposes abstractions that are known to the user or
> are intuitive to the user.
> 3. Avoid exposing internal objects or implementation details to the user.
>
> So tying all this into answering the question of what we should expose
> through the APIs -
>
> In Kafka Streams, the user is aware of the concept of tasks and partitions
> since the application scales with the number of partitions and tasks are
> the construct for logical parallelism. The user is also aware of the
> concept of state stores though until now they were not user accessible.
> With Queryable State, the bare minimum abstractions that we need to expose
> are state stores and the location of state store partitions.
>
> For exposing the state stores, the getStore() APIs look good but I think
> for locating the state store partitions, we should go back to the original
> proposal of simply exposing some sort of getPartitionMetadata() that
> returns a PartitionMetadata or TaskMetadata object keyed by HostInfo.
>
> The application will convert the HostInfo (host and port) into some
> connection URL to talk to the other app instances via its own RPC mechanism
> depending on whether it needs to scatter-gather or just query. The
> application will know how a key maps to a partition and through
> PartitionMetadata it will know how to locate the server that hosts the
> store that has the partition hosting that key.
>
> On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll <mich...@confluent.io> wrote:
>
> > Addendum in case my previous email wasn't clear:
> >
> > > So for any given instance of a streams application there will never be
> > both a v1 and v2 alive at the same time
> >
> > That's right.  But the current live instance will be able to tell other
> > instances, via its endpoint setting, whether it wants to be contacted at
> v1
> > or at v2.  The other instances can't guess that.  Think: if an older
> > instance would manually compose the "rest" of an endpoint URI, having
> only
> > the host and port from the endpoint setting, it might not know that the
> new
> > instances have a different endpoint suffix, for example).
> >
> >
> > On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll <mich...@confluent.io>
> wrote:
> >
> > > Damian,
> > >
> > > about the rolling upgrade comment:  An instance A will contact another
> > > instance B by the latter's endpoint, right?  So if A has no further
> > > information available than B's host and port, then how should instance
> A
> > > know whether it should call B at /v1/ or at /v2/?  I agree that my
> > > suggestion isn't foolproof, but it is afaict better than the host:port
> > > approach.
> > >
> > >
> > >
> > > On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy <damian....@gmail.com>
> wrote:
> > >
> > >> Michael - i'm ok with changing it to a string. Any one else have a
> > strong
> > >> opinion on this?
> > >>
> > >> FWIW - i don't think it will work fine as is during the rolling
> upgrade
> > >> scenario as the service that is listening on the port needs to be
> > embedded
> > >> within each instance. So for any given instance of a streams
> application
> > >> there will never be both a v1 and v2 alive at the same time (unless of
> > >> course the process didn't shutdown properly, but then you have another
> > >> problem...).
> > >>
> > >> On Fri, 8 Jul 2016 at 15:26 Michael Noll <mich...@confluent.io>
> wrote:
> > >>
> > >> > I have one further comment about
> `StreamsConfig.USER_ENDPOINT_CONFIG`.
> > >> >
> > >> > I think we should consider to not restricting the value of this
> > setting
> > >> to
> > >> > only `host:port` pairs.  By design, this setting is capturing
> > >> user-driven
> > >> > metadata to define an endpoint, so why restrict the creativity or
> > >> > flexibility of our users?  I can imagine, for example, that users
> > would
> > >> > like to set values such as `https://host:port/api/rest/v1/` in this
> > >> field
> > >> > (e.g. being able to distinguish between `.../v1/` and `.../v2/` may
> > >> help in
> > >> > scenarios such as rolling upgrades, where, during the upgrade, older
> > >> > instances may need to coexist with newer instances).
> > >> >
> > >> > That said, I don't have a strong opinion here.
> > >> >
> > >> > -Michael
> > >> >
> > >> >
> > >> >
> > >> > On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax <
> > matth...@confluent.io>
> > >> > wrote:
> > >> >
> > >> > > +1
> > >> > >
> > >> > > On 07/08/2016 11:03 AM, Eno Thereska wrote:
> > >> > > > +1 (non-binding)
> > >> > > >
> > >> > > >> On 7 Jul 2016, at 18:31, Sriram Subramanian <r...@confluent.io>
> > >> wrote:
> > >> > > >>
> > >> > > >> +1
> > >> > > >>
> > >> > > >> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai
> > >> <h...@pinterest.com.invalid
> > >> > >
> > >> > > >> wrote:
> > >> > > >>
> > >> > > >>> +1
> > >> > > >>>
> > >> > > >>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll <
> > >> mich...@confluent.io>
> > >> > > wrote:
> > >> > > >>>
> > >> > > >>>> +1 (non-binding)
> > >> > > >>>>
> > >> > > >>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy <
> > >> damian....@gmail.com>
> > >> > > >>> wrote:
> > >> > > >>>>
> > >> > > >>>>> Thanks Henry - we've updated the KIP with an example and the
> > new
> > >> > > config
> > >> > > >>>>> parameter required. FWIW the user doesn't register a
> listener,
> > >> they
> > >> > > >>>> provide
> > >> > > >>>>> a host:port in config. It is expected they will start a
> > service
> > >> > > running
> > >> > > >>>> on
> > >> > > >>>>> that host:port that they can use to connect to the running
> > >> > > KafkaStreams
> > >> > > >>>>> Instance.
> > >> > > >>>>>
> > >> > > >>>>> Thanks,
> > >> > > >>>>> Damian
> > >> > > >>>>>
> > >> > > >>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai
> > >> <h...@pinterest.com.invalid>
> > >> > > >>>> wrote:
> > >> > > >>>>>
> > >> > > >>>>>> It wasn't quite clear to me how the user program interacts
> > with
> > >> > the
> > >> > > >>>>>> discovery API, especially on the user supplied listener
> part,
> > >> how
> > >> > > >>> does
> > >> > > >>>>> the
> > >> > > >>>>>> user program supply that listener to KafkaStreams and how
> > does
> > >> > > >>>>> KafkaStreams
> > >> > > >>>>>> know which port the user listener is running, maybe a more
> > >> > complete
> > >> > > >>>>>> end-to-end example including the steps on registering the
> > user
> > >> > > >>> listener
> > >> > > >>>>> and
> > >> > > >>>>>> whether the user listener needs to be involved with task
> > >> > > >>> reassignment.
> > >> > > >>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang <
> > >> wangg...@gmail.com
> > >> > >
> > >> > > >>>>> wrote:
> > >> > > >>>>>>
> > >> > > >>>>>>> +1
> > >> > > >>>>>>>
> > >> > > >>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy <
> > >> > damian....@gmail.com>
> > >> > > >>>>>> wrote:
> > >> > > >>>>>>>
> > >> > > >>>>>>>> Hi all,
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> I'd like to initiate the voting process for KIP-67
> > >> > > >>>>>>>> <
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>>
> > >> > > >>>
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > >> > > >>>>>>>>>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> KAFKA-3909 <
> > https://issues.apache.org/jira/browse/KAFKA-3909
> > >> >
> > >> > is
> > >> > > >>>> the
> > >> > > >>>>>> top
> > >> > > >>>>>>>> level JIRA for this effort.
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> Initial PRs for Step 1 of the process are:
> > >> > > >>>>>>>> Expose State Store Names <
> > >> > > >>>> https://github.com/apache/kafka/pull/1526>
> > >> > > >>>>>> and
> > >> > > >>>>>>>> Query Local State Stores <
> > >> > > >>>> https://github.com/apache/kafka/pull/1565>
> > >> > > >>>>>>>>
> > >> > > >>>>>>>> Thanks,
> > >> > > >>>>>>>> Damian
> > >> > > >>>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>>
> > >> > > >>>>>>> --
> > >> > > >>>>>>> -- Guozhang
> > >> > > >>>>>>>
> > >> > > >>>>>>
> > >> > > >>>>>
> > >> > > >>>>
> > >> > > >>>>
> > >> > > >>>>
> > >> > > >>>> --
> > >> > > >>>> Best regards,
> > >> > > >>>> Michael Noll
> > >> > > >>>>
> > >> > > >>>>
> > >> > > >>>>
> > >> > > >>>> *Michael G. Noll | Product Manager | Confluent | +1
> > >> > > 650.453.5860Download
> > >> > > >>>> Apache Kafka and Confluent Platform:
> www.confluent.io/download
> > >> > > >>>> <http://www.confluent.io/download>*
> > >> > > >>>>
> > >> > > >>>
> > >> > > >
> > >> > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > Best regards,
> > >> > Michael Noll
> > >> >
> > >> >
> > >> >
> > >> > *Michael G. Noll | Product Manager | Confluent | +1
> > 650.453.5860Download
> > >> > Apache Kafka and Confluent Platform: www.confluent.io/download
> > >> > <http://www.confluent.io/download>*
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > Best regards,
> > > Michael Noll
> > >
> > >
> > >
> > > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
> > > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
> > > www.confluent.io/download <http://www.confluent.io/download>*
> > >
> >
> >
> >
> > --
> > Best regards,
> > Michael Noll
> >
> >
> >
> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> > Apache Kafka and Confluent Platform: www.confluent.io/download
> > <http://www.confluent.io/download>*
> >
>
>
>
> --
> Thanks,
> Neha
>



-- 
-- Guozhang

Reply via email to