One more thing on the above, the methods on KafkaStreams should be changed
to something like:

Collection<KafkaStreamsMetadata> allMetadata()

Collection<KafkaStreamsMetadata> allMetadataForStore(final String storeName)

KafkaStreamsMetadata metadataWithKey(final String storeName,
                                                final K key,
                                                final Serializer<K>
keySerializer)


Thanks,
Damian

On Tue, 12 Jul 2016 at 11:14 Damian Guy <damian....@gmail.com> wrote:

> Hi,
>
> I agree with point 1. application.server is a better name for the config
> (we'll change this). However, on point 2 I think we should stick mostly
> with what we already have. I've tried both ways of doing this when working
> on the JIRA and building examples and I find the current approach more
> intuitive and easier to use than the Map based approach.
> However, there is probably a naming issue. We should rename
> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
> but provides all the information a developer needs to be able to find the
> instance(s) of a Streams application that a particular store is running on,
> i.e.,
>
> public class KafkStreamsMetadata {
>     private final HostInfo hostInfo;
>     private final Set<String> stateStoreNames;
>     private final Set<TopicPartition> topicPartitions;
>
>
> So using the API to route to a new host is fairly simple, particularly in
> the case when you want to find the host for a particular key, i.e.,
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final KafkaStreamsMetadata streamsMetadata = 
> kafkaStreams.instanceWithKey("word-count", "hello", 
> Serdes.String().serializer());
> http.get("http://"; + streamsMetadata.host() + ":" + streamsMetadata.port() + 
> "/get/word-count/hello");
>
>
> And if you want to do a scatter gather approach:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas = 
> kafkaStreams.allInstancesWithStore("word-count");
> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
>     http.get("http://"; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
>     ...
> }
>
>
> And if you iterated over all instances:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas = 
> kafkaStreams.allInstances();
> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
>     if (streamsMetadata.stateStoreNames().contains("word-count")) {
>         http.get("http://"; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
>         ...
>     }
> }
>
>
> If we were to change this to use Map<HostInfo, Set<TaskMetadata>> for the
> most part users would need to iterate over the entry or key set. Examples:
>
> The finding an instance by key is a little odd:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = 
> kafkaStreams.instanceWithKey("word-count","hello", 
> Serdes.String().serializer());
> // this is a bit odd as i only expect one:
> for (HostInfo hostInfo : streamsMetadata.keySet()) {
>     http.get("http://"; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
> }
>
>
> The scatter/gather by store is fairly similar to the previous example:
>
> final KafkaStreams kafkaStreams = createKafkaStreams();
> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = 
> kafkaStreams.allInstancesWithStore("word-count");
> for(HostInfo hostInfo : streamsMetadata.keySet()) {
>     http.get("http://"; + hostInfo.host() + ":" + hostInfo.port() + 
> "/get/word-count/hello");
>     ...
> }
>
> And iterating over all instances:
>
> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = 
> kafkaStreams.allInstances();
> for (Map.Entry<HostInfo, Set<TaskMetadata>> entry : 
> streamsMetadata.entrySet()) {
>     for (TaskMetadata taskMetadata : entry.getValue()) {
>         if (taskMetadata.stateStoreNames().contains("word-count")) {
>             http.get("http://"; + streamsMetadata.host() + ":" + 
> streamsMetadata.port() + "/get/word-count/hello");
>             ...
>         }
>     }
> }
>
>
> IMO - having a class we return is the better approach as it nicely wraps
> the related things, i.e, host:port, store names, topic partitions into an
> Object that is easy to use. Further we could add some behaviour to this
> class if we felt it necessary, i.e, hasStore(storeName) etc.
>
> Anyway, i'm interested in your thoughts.
>
> Thanks,
> Damian
>
> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang <wangg...@gmail.com> wrote:
>
>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>>
>> I agree with Neha that Kafka Streams can provide the bare minimum APIs
>> just
>> for host/port, and user's implemented layer can provide URL / proxy
>> address
>> they want to build on top of it.
>>
>>
>> 2. Re Improving KafkaStreamsInstance interface:
>>
>> Users are indeed aware of "TaskId" class which is not part of internal
>> packages and is exposed in PartitionGrouper interface that can be
>> instantiated by the users, which is assigned with input topic partitions.
>> So we can probably change the APIs as:
>>
>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks() where
>> TaskMetadata has fields such as taskId, list of assigned partitions, list
>> of state store names; and HostState can include hostname / port. The port
>> is the listening port of a user-defined listener that users provide to
>> listen for queries (e.g., using REST APIs).
>>
>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getTasksWithStore(String /*
>> storeName */) would return only the hosts and their assigned tasks if at
>> least one of the tasks include the given store name.
>>
>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getTaskWithStoreAndKey(Key
>> k, String /* storeName */, StreamPartitioner partitioner) would return
>> only
>> the host and their assigned task if the store with the store name has a
>> particular key, according to the partitioner behavior.
>>
>>
>>
>> Guozhang
>>
>>
>> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede <n...@confluent.io>
>> wrote:
>>
>> > Few thoughts that became apparent after observing example code of what
>> an
>> > application architecture and code might look like with these changes.
>> > Apologize for the late realization hence.
>> >
>> > 1. "user.endpoint" will be very differently defined for respective
>> > applications. I don't think Kafka Streams should generalize to accept
>> any
>> > connection URL as we expect to only expose metadata expressed as
>> HostInfo
>> > (which is defined by host & port) and hence need to interpret the
>> > "user.endpoint" as host & port. Applications will have their own
>> endpoint
>> > configs that will take many forms and they will be responsible for
>> parsing
>> > out host and port and configuring Kafka Streams accordingly.
>> >
>> > If we are in fact limiting to host and port, I wonder if we should
>> change
>> > the name of "user.endpoint" into something more specific. We have
>> clients
>> > expose host/port pairs as "bootstrap.servers". Should this be
>> > "application.server"?
>> >
>> > 2. I don't think we should expose another abstraction called
>> > KafkaStreamsInstance to the user. This is related to the discussion of
>> the
>> > right abstraction that we want to expose to an application. The
>> abstraction
>> > discussion itself should probably be part of the KIP itself, let me
>> give a
>> > quick summary of my thoughts here:
>> > 1. The person implementing an application using Queryable State has
>> likely
>> > already made some choices for the service layer–a REST framework,
>> Thrift,
>> > or whatever. We don't really want to add another RPC framework to this
>> mix,
>> > nor do we want to try to make Kafka's RPC mechanism general purpose.
>> > 2. Likewise, it should be clear that the API you want to expose to the
>> > front-end/client service is not necessarily the API you'd need
>> internally
>> > as there may be additional filtering/processing in the router.
>> >
>> > Given these constraints, what we prefer to add is a fairly low-level
>> > "toolbox" that would let you do anything you want, but requires to route
>> > and perform any aggregation or processing yourself. This pattern is
>> > not recommended for all kinds of services/apps, but there are
>> definitely a
>> > category of things where it is a big win and other advanced applications
>> > are out-of-scope.
>> >
>> > The APIs we expose should take the following things into consideration:
>> > 1. Make it clear to the user that they will do the routing, aggregation,
>> > processing themselves. So the bare minimum that we want to expose is
>> store
>> > and partition metadata per application server identified by the host and
>> > port.
>> > 2. Ensure that the API exposes abstractions that are known to the user
>> or
>> > are intuitive to the user.
>> > 3. Avoid exposing internal objects or implementation details to the
>> user.
>> >
>> > So tying all this into answering the question of what we should expose
>> > through the APIs -
>> >
>> > In Kafka Streams, the user is aware of the concept of tasks and
>> partitions
>> > since the application scales with the number of partitions and tasks are
>> > the construct for logical parallelism. The user is also aware of the
>> > concept of state stores though until now they were not user accessible.
>> > With Queryable State, the bare minimum abstractions that we need to
>> expose
>> > are state stores and the location of state store partitions.
>> >
>> > For exposing the state stores, the getStore() APIs look good but I think
>> > for locating the state store partitions, we should go back to the
>> original
>> > proposal of simply exposing some sort of getPartitionMetadata() that
>> > returns a PartitionMetadata or TaskMetadata object keyed by HostInfo.
>> >
>> > The application will convert the HostInfo (host and port) into some
>> > connection URL to talk to the other app instances via its own RPC
>> mechanism
>> > depending on whether it needs to scatter-gather or just query. The
>> > application will know how a key maps to a partition and through
>> > PartitionMetadata it will know how to locate the server that hosts the
>> > store that has the partition hosting that key.
>> >
>> > On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll <mich...@confluent.io>
>> wrote:
>> >
>> > > Addendum in case my previous email wasn't clear:
>> > >
>> > > > So for any given instance of a streams application there will never
>> be
>> > > both a v1 and v2 alive at the same time
>> > >
>> > > That's right.  But the current live instance will be able to tell
>> other
>> > > instances, via its endpoint setting, whether it wants to be contacted
>> at
>> > v1
>> > > or at v2.  The other instances can't guess that.  Think: if an older
>> > > instance would manually compose the "rest" of an endpoint URI, having
>> > only
>> > > the host and port from the endpoint setting, it might not know that
>> the
>> > new
>> > > instances have a different endpoint suffix, for example).
>> > >
>> > >
>> > > On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll <mich...@confluent.io>
>> > wrote:
>> > >
>> > > > Damian,
>> > > >
>> > > > about the rolling upgrade comment:  An instance A will contact
>> another
>> > > > instance B by the latter's endpoint, right?  So if A has no further
>> > > > information available than B's host and port, then how should
>> instance
>> > A
>> > > > know whether it should call B at /v1/ or at /v2/?  I agree that my
>> > > > suggestion isn't foolproof, but it is afaict better than the
>> host:port
>> > > > approach.
>> > > >
>> > > >
>> > > >
>> > > > On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy <damian....@gmail.com>
>> > wrote:
>> > > >
>> > > >> Michael - i'm ok with changing it to a string. Any one else have a
>> > > strong
>> > > >> opinion on this?
>> > > >>
>> > > >> FWIW - i don't think it will work fine as is during the rolling
>> > upgrade
>> > > >> scenario as the service that is listening on the port needs to be
>> > > embedded
>> > > >> within each instance. So for any given instance of a streams
>> > application
>> > > >> there will never be both a v1 and v2 alive at the same time
>> (unless of
>> > > >> course the process didn't shutdown properly, but then you have
>> another
>> > > >> problem...).
>> > > >>
>> > > >> On Fri, 8 Jul 2016 at 15:26 Michael Noll <mich...@confluent.io>
>> > wrote:
>> > > >>
>> > > >> > I have one further comment about
>> > `StreamsConfig.USER_ENDPOINT_CONFIG`.
>> > > >> >
>> > > >> > I think we should consider to not restricting the value of this
>> > > setting
>> > > >> to
>> > > >> > only `host:port` pairs.  By design, this setting is capturing
>> > > >> user-driven
>> > > >> > metadata to define an endpoint, so why restrict the creativity or
>> > > >> > flexibility of our users?  I can imagine, for example, that users
>> > > would
>> > > >> > like to set values such as `https://host:port/api/rest/v1/` in
>> this
>> > > >> field
>> > > >> > (e.g. being able to distinguish between `.../v1/` and `.../v2/`
>> may
>> > > >> help in
>> > > >> > scenarios such as rolling upgrades, where, during the upgrade,
>> older
>> > > >> > instances may need to coexist with newer instances).
>> > > >> >
>> > > >> > That said, I don't have a strong opinion here.
>> > > >> >
>> > > >> > -Michael
>> > > >> >
>> > > >> >
>> > > >> >
>> > > >> > On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax <
>> > > matth...@confluent.io>
>> > > >> > wrote:
>> > > >> >
>> > > >> > > +1
>> > > >> > >
>> > > >> > > On 07/08/2016 11:03 AM, Eno Thereska wrote:
>> > > >> > > > +1 (non-binding)
>> > > >> > > >
>> > > >> > > >> On 7 Jul 2016, at 18:31, Sriram Subramanian <
>> r...@confluent.io>
>> > > >> wrote:
>> > > >> > > >>
>> > > >> > > >> +1
>> > > >> > > >>
>> > > >> > > >> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai
>> > > >> <h...@pinterest.com.invalid
>> > > >> > >
>> > > >> > > >> wrote:
>> > > >> > > >>
>> > > >> > > >>> +1
>> > > >> > > >>>
>> > > >> > > >>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll <
>> > > >> mich...@confluent.io>
>> > > >> > > wrote:
>> > > >> > > >>>
>> > > >> > > >>>> +1 (non-binding)
>> > > >> > > >>>>
>> > > >> > > >>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy <
>> > > >> damian....@gmail.com>
>> > > >> > > >>> wrote:
>> > > >> > > >>>>
>> > > >> > > >>>>> Thanks Henry - we've updated the KIP with an example and
>> the
>> > > new
>> > > >> > > config
>> > > >> > > >>>>> parameter required. FWIW the user doesn't register a
>> > listener,
>> > > >> they
>> > > >> > > >>>> provide
>> > > >> > > >>>>> a host:port in config. It is expected they will start a
>> > > service
>> > > >> > > running
>> > > >> > > >>>> on
>> > > >> > > >>>>> that host:port that they can use to connect to the
>> running
>> > > >> > > KafkaStreams
>> > > >> > > >>>>> Instance.
>> > > >> > > >>>>>
>> > > >> > > >>>>> Thanks,
>> > > >> > > >>>>> Damian
>> > > >> > > >>>>>
>> > > >> > > >>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai
>> > > >> <h...@pinterest.com.invalid>
>> > > >> > > >>>> wrote:
>> > > >> > > >>>>>
>> > > >> > > >>>>>> It wasn't quite clear to me how the user program
>> interacts
>> > > with
>> > > >> > the
>> > > >> > > >>>>>> discovery API, especially on the user supplied listener
>> > part,
>> > > >> how
>> > > >> > > >>> does
>> > > >> > > >>>>> the
>> > > >> > > >>>>>> user program supply that listener to KafkaStreams and
>> how
>> > > does
>> > > >> > > >>>>> KafkaStreams
>> > > >> > > >>>>>> know which port the user listener is running, maybe a
>> more
>> > > >> > complete
>> > > >> > > >>>>>> end-to-end example including the steps on registering
>> the
>> > > user
>> > > >> > > >>> listener
>> > > >> > > >>>>> and
>> > > >> > > >>>>>> whether the user listener needs to be involved with task
>> > > >> > > >>> reassignment.
>> > > >> > > >>>>>>
>> > > >> > > >>>>>>
>> > > >> > > >>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang <
>> > > >> wangg...@gmail.com
>> > > >> > >
>> > > >> > > >>>>> wrote:
>> > > >> > > >>>>>>
>> > > >> > > >>>>>>> +1
>> > > >> > > >>>>>>>
>> > > >> > > >>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy <
>> > > >> > damian....@gmail.com>
>> > > >> > > >>>>>> wrote:
>> > > >> > > >>>>>>>
>> > > >> > > >>>>>>>> Hi all,
>> > > >> > > >>>>>>>>
>> > > >> > > >>>>>>>> I'd like to initiate the voting process for KIP-67
>> > > >> > > >>>>>>>> <
>> > > >> > > >>>>>>>>
>> > > >> > > >>>>>>>
>> > > >> > > >>>>>>
>> > > >> > > >>>>>
>> > > >> > > >>>>
>> > > >> > > >>>
>> > > >> > >
>> > > >> >
>> > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>> > > >> > > >>>>>>>>>
>> > > >> > > >>>>>>>>
>> > > >> > > >>>>>>>> KAFKA-3909 <
>> > > https://issues.apache.org/jira/browse/KAFKA-3909
>> > > >> >
>> > > >> > is
>> > > >> > > >>>> the
>> > > >> > > >>>>>> top
>> > > >> > > >>>>>>>> level JIRA for this effort.
>> > > >> > > >>>>>>>>
>> > > >> > > >>>>>>>> Initial PRs for Step 1 of the process are:
>> > > >> > > >>>>>>>> Expose State Store Names <
>> > > >> > > >>>> https://github.com/apache/kafka/pull/1526>
>> > > >> > > >>>>>> and
>> > > >> > > >>>>>>>> Query Local State Stores <
>> > > >> > > >>>> https://github.com/apache/kafka/pull/1565>
>> > > >> > > >>>>>>>>
>> > > >> > > >>>>>>>> Thanks,
>> > > >> > > >>>>>>>> Damian
>> > > >> > > >>>>>>>>
>> > > >> > > >>>>>>>
>> > > >> > > >>>>>>>
>> > > >> > > >>>>>>>
>> > > >> > > >>>>>>> --
>> > > >> > > >>>>>>> -- Guozhang
>> > > >> > > >>>>>>>
>> > > >> > > >>>>>>
>> > > >> > > >>>>>
>> > > >> > > >>>>
>> > > >> > > >>>>
>> > > >> > > >>>>
>> > > >> > > >>>> --
>> > > >> > > >>>> Best regards,
>> > > >> > > >>>> Michael Noll
>> > > >> > > >>>>
>> > > >> > > >>>>
>> > > >> > > >>>>
>> > > >> > > >>>> *Michael G. Noll | Product Manager | Confluent | +1
>> > > >> > > 650.453.5860Download
>> > > >> > > >>>> Apache Kafka and Confluent Platform:
>> > www.confluent.io/download
>> > > >> > > >>>> <http://www.confluent.io/download>*
>> > > >> > > >>>>
>> > > >> > > >>>
>> > > >> > > >
>> > > >> > >
>> > > >> > >
>> > > >> >
>> > > >> >
>> > > >> > --
>> > > >> > Best regards,
>> > > >> > Michael Noll
>> > > >> >
>> > > >> >
>> > > >> >
>> > > >> > *Michael G. Noll | Product Manager | Confluent | +1
>> > > 650.453.5860Download
>> > > >> > Apache Kafka and Confluent Platform: www.confluent.io/download
>> > > >> > <http://www.confluent.io/download>*
>> > > >> >
>> > > >>
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > Best regards,
>> > > > Michael Noll
>> > > >
>> > > >
>> > > >
>> > > > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
>> > > > <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
>> > > > www.confluent.io/download <http://www.confluent.io/download>*
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > Best regards,
>> > > Michael Noll
>> > >
>> > >
>> > >
>> > > *Michael G. Noll | Product Manager | Confluent | +1
>> 650.453.5860Download
>> > > Apache Kafka and Confluent Platform: www.confluent.io/download
>> > > <http://www.confluent.io/download>*
>> > >
>> >
>> >
>> >
>> > --
>> > Thanks,
>> > Neha
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to