Yes, you get compile time errors On Wed, 13 Jul 2016 at 16:22 Damian Guy <damian....@gmail.com> wrote:
> You wont get a runtime error as you wouldn't find a store of that type. > The API would return null > > On Wed, 13 Jul 2016 at 16:22 Jay Kreps <j...@confluent.io> wrote: > >> But if "my-store" is not of type MyStoreType don't you still get a run >> time >> error that in effect is the same as the class cast would be? Basically the >> question I'm asking is whether this added complexity is actually moving >> runtime errors to compile time errors. >> >> -Jay >> >> On Wed, Jul 13, 2016 at 4:16 PM, Damian Guy <damian....@gmail.com> wrote: >> >> > You create your custom Store, i.e,: >> > >> > /** >> > * An interface your custom store provides >> > * @param <K> >> > * @param <V> >> > */ >> > interface MyStoreType<K,V> { >> > V get(K key); >> > void put(K key, V value); >> > } >> > >> > /** >> > * Implement your store >> > * @param <K> >> > * @param <V> >> > */ >> > public class MyStoreImpl<K,V> implements StateStore, MyStoreType<K,V> { >> > // implementation of the store goes here >> > } >> > >> > >> > Provide an implementation of QueryableStoreType to find stores that >> match >> > your Custom store: >> > >> > /** >> > * Implement QueryableStoreType to find stores that match your Custom >> Store >> > * @param <K> >> > * @param <V> >> > */ >> > >> > public class MyQueryableType<K, V> implements >> > QueryableStoreType<MyStoreType<K, V>>{ >> > @Override >> > public boolean accepts(final StateStore stateStore) { >> > return stateStore instanceof MyQueryableType; >> > } >> > >> > @Override >> > public MyStoreType<K,V> create(final StateStoreProvider >> > storeProvider, final String storeName) { >> > return new MyCompositeStore<>(storeName, storeProvider); >> > } >> > } >> > >> > >> > Create a composite type to wrap the potentially many underlying >> instances >> > of the store, i.e, there will be one per partition >> > >> > /** >> > * Provide a wrapper over the underlying store instances. >> > */ >> > public class MyCompositeStore<K,V> implements MyStoreType<K,V> { >> > private final String storeName; >> > private final StateStoreProvider provider; >> > >> > public MyCompositeStore(final String storeName, final >> > StateStoreProvider provider) { >> > this.storeName = storeName; >> > this.provider = provider; >> > } >> > >> > @Override >> > public V get(final K key) { >> > final List<MyStoreType<K, V>> stores = >> > provider.getStores(storeName, new MyQueryableType<K,V>()); >> > // iterate over stores looking for key >> > } >> > >> > @Override >> > public void put(final K key, V value) { >> > >> > } >> > } >> > >> > >> > Lookup your new store from KafkaStreams: >> > >> > final MyStoreType store = kafkaStreams.store("my-store", new >> > MyQueryableType<>()); >> > >> > >> > So we get type safety and we can constrain the interfaces returned to >> Read >> > Only versions (which is what we are doing for KeyValue and Window >> Stores) >> > >> > HTH, >> > Damian >> > >> > On Wed, 13 Jul 2016 at 15:30 Jay Kreps <j...@confluent.io> wrote: >> > >> > > But to avoid the cast you introduce a bunch of magic that doesn't >> really >> > > bring type safety, right? Or possibly I'm misunderstanding, how do I >> plug >> > > in a new store type and get access to it? Can you give the steps for >> > that? >> > > >> > > -Jay >> > > >> > > On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > >> > > > Personally I think the additional complexity of the introduced " >> > > > QueryableStoreType" interface is still acceptable from a user's >> point >> > of >> > > > view: this is the only interface we are exposing to users, and other >> > > > wrappers are all internal classes. >> > > > >> > > > Regarding "QueryableStoreTypes", maybe we can consider declaring its >> > > > "QueryableStoreTypeMatcher" as private instead of public, since >> > > > "QueryableStoreTypes" is just used as a convenient manner for using >> > > > library-provided types, like serialization/Serdes.java. >> > > > >> > > > With this the only additional interface the library is exposing is " >> > > > QueryableStoreType", and users optionally can just use >> > > > "QueryableStoreTypes" >> > > > to conveniently create library-provided store types. >> > > > >> > > > >> > > > Guozhang >> > > > >> > > > >> > > > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede <n...@confluent.io> >> > > wrote: >> > > > >> > > > > Damian -- appreciate the example code and you convinced me. Agree >> > that >> > > > the >> > > > > class approach is better and renaming to KafkaStreamsMetadata >> along >> > > with >> > > > > renaming the API methods will address the issues I was referring >> to. >> > > > > >> > > > > One other thing I wanted to get people's thoughts on was the way >> we >> > are >> > > > > proposing to handle different store types. I am sure you guys have >> > > > thought >> > > > > about the tradeoffs of using the store wrappers and matchers ( >> > > > > QueryableStoreType) vs just making users cast the returned store >> to >> > the >> > > > > type they would expect to use. That is simple but the obvious >> > downside >> > > is >> > > > > that it is likely to result in exceptions for users that don't >> know >> > > what >> > > > > they are doing. >> > > > > >> > > > > In my experience of dealing with apps that would use queriable >> state, >> > > it >> > > > > appears to me that a majority would just use the key value store. >> > > Partly >> > > > > because that will suffice and partly because people might just >> follow >> > > the >> > > > > simpler examples we provide that use key-value store. For advanced >> > > users, >> > > > > they will be aware of the reason they want to use the windowed >> store >> > > and >> > > > > will know how to cast it. The advantage of the current approach is >> > that >> > > > it >> > > > > is likely more robust and general but involves introduces more >> > > interfaces >> > > > > and wrapper code. >> > > > > >> > > > > I tend to prefer simplicity to optimize for the general case, but >> > > curious >> > > > > to get people's thoughts on this as well. >> > > > > >> > > > > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski <j...@jagunet.com> >> > > wrote: >> > > > > >> > > > > > IMO, that makes the most sense. >> > > > > > >> > > > > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma <ism...@juma.me.uk> >> > > wrote: >> > > > > > > >> > > > > > > Hi Damian, >> > > > > > > >> > > > > > > How about StreamsMetadata instead? The general naming pattern >> > seems >> > > > to >> > > > > > > avoid the `Kafka` prefix for everything outside of >> `KafkaStreams` >> > > > > itself. >> > > > > > > >> > > > > > > Ismael >> > > > > > > >> > > > > > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy < >> > damian....@gmail.com> >> > > > > > wrote: >> > > > > > > >> > > > > > >> Hi, >> > > > > > >> >> > > > > > >> I agree with point 1. application.server is a better name for >> > the >> > > > > config >> > > > > > >> (we'll change this). However, on point 2 I think we should >> stick >> > > > > mostly >> > > > > > >> with what we already have. I've tried both ways of doing this >> > when >> > > > > > working >> > > > > > >> on the JIRA and building examples and I find the current >> > approach >> > > > more >> > > > > > >> intuitive and easier to use than the Map based approach. >> > > > > > >> However, there is probably a naming issue. We should rename >> > > > > > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is >> very >> > > > > simple, >> > > > > > >> but provides all the information a developer needs to be >> able to >> > > > find >> > > > > > the >> > > > > > >> instance(s) of a Streams application that a particular store >> is >> > > > > running >> > > > > > on, >> > > > > > >> i.e., >> > > > > > >> >> > > > > > >> public class KafkStreamsMetadata { >> > > > > > >> private final HostInfo hostInfo; >> > > > > > >> private final Set<String> stateStoreNames; >> > > > > > >> private final Set<TopicPartition> topicPartitions; >> > > > > > >> >> > > > > > >> >> > > > > > >> So using the API to route to a new host is fairly simple, >> > > > particularly >> > > > > > in >> > > > > > >> the case when you want to find the host for a particular key, >> > > i.e., >> > > > > > >> >> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); >> > > > > > >> final KafkaStreamsMetadata streamsMetadata = >> > > > > > >> kafkaStreams.instanceWithKey("word-count", "hello", >> > > > > > >> Serdes.String().serializer()); >> > > > > > >> http.get("http://" + streamsMetadata.host() + ":" + >> > > > > > >> streamsMetadata.port() + "/get/word-count/hello"); >> > > > > > >> >> > > > > > >> >> > > > > > >> And if you want to do a scatter gather approach: >> > > > > > >> >> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); >> > > > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas >> = >> > > > > > >> kafkaStreams.allInstancesWithStore("word-count"); >> > > > > > >> for (KafkaStreamsMetadata streamsMetadata : >> > > kafkaStreamsMetadatas) { >> > > > > > >> http.get("http://" + streamsMetadata.host() + ":" + >> > > > > > >> streamsMetadata.port() + "/get/word-count/hello"); >> > > > > > >> ... >> > > > > > >> } >> > > > > > >> >> > > > > > >> >> > > > > > >> And if you iterated over all instances: >> > > > > > >> >> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); >> > > > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas >> = >> > > > > > >> kafkaStreams.allInstances(); >> > > > > > >> for (KafkaStreamsMetadata streamsMetadata : >> > > kafkaStreamsMetadatas) { >> > > > > > >> if >> > (streamsMetadata.stateStoreNames().contains("word-count")) { >> > > > > > >> http.get("http://" + streamsMetadata.host() + ":" + >> > > > > > >> streamsMetadata.port() + "/get/word-count/hello"); >> > > > > > >> ... >> > > > > > >> } >> > > > > > >> } >> > > > > > >> >> > > > > > >> >> > > > > > >> If we were to change this to use Map<HostInfo, >> > Set<TaskMetadata>> >> > > > for >> > > > > > the >> > > > > > >> most part users would need to iterate over the entry or key >> set. >> > > > > > Examples: >> > > > > > >> >> > > > > > >> The finding an instance by key is a little odd: >> > > > > > >> >> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); >> > > > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = >> > > > > > >> kafkaStreams.instanceWithKey("word-count","hello", >> > > > > > >> Serdes.String().serializer()); >> > > > > > >> // this is a bit odd as i only expect one: >> > > > > > >> for (HostInfo hostInfo : streamsMetadata.keySet()) { >> > > > > > >> http.get("http://" + streamsMetadata.host() + ":" + >> > > > > > >> streamsMetadata.port() + "/get/word-count/hello"); >> > > > > > >> } >> > > > > > >> >> > > > > > >> >> > > > > > >> The scatter/gather by store is fairly similar to the previous >> > > > example: >> > > > > > >> >> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); >> > > > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = >> > > > > > >> kafkaStreams.allInstancesWithStore("word-count"); >> > > > > > >> for(HostInfo hostInfo : streamsMetadata.keySet()) { >> > > > > > >> http.get("http://" + hostInfo.host() + ":" + >> > hostInfo.port() + >> > > > > > >> "/get/word-count/hello"); >> > > > > > >> ... >> > > > > > >> } >> > > > > > >> >> > > > > > >> And iterating over all instances: >> > > > > > >> >> > > > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = >> > > > > > >> kafkaStreams.allInstances(); >> > > > > > >> for (Map.Entry<HostInfo, Set<TaskMetadata>> entry : >> > > > > > >> streamsMetadata.entrySet()) { >> > > > > > >> for (TaskMetadata taskMetadata : entry.getValue()) { >> > > > > > >> if >> > (taskMetadata.stateStoreNames().contains("word-count")) >> > > { >> > > > > > >> http.get("http://" + streamsMetadata.host() + >> ":" + >> > > > > > >> streamsMetadata.port() + "/get/word-count/hello"); >> > > > > > >> ... >> > > > > > >> } >> > > > > > >> } >> > > > > > >> } >> > > > > > >> >> > > > > > >> >> > > > > > >> IMO - having a class we return is the better approach as it >> > nicely >> > > > > wraps >> > > > > > >> the related things, i.e, host:port, store names, topic >> > partitions >> > > > into >> > > > > > an >> > > > > > >> Object that is easy to use. Further we could add some >> behaviour >> > to >> > > > > this >> > > > > > >> class if we felt it necessary, i.e, hasStore(storeName) etc. >> > > > > > >> >> > > > > > >> Anyway, i'm interested in your thoughts. >> > > > > > >> >> > > > > > >> Thanks, >> > > > > > >> Damian >> > > > > > >> >> > > > > > >> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang < >> wangg...@gmail.com> >> > > > > wrote: >> > > > > > >> >> > > > > > >>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG: >> > > > > > >>> >> > > > > > >>> I agree with Neha that Kafka Streams can provide the bare >> > minimum >> > > > > APIs >> > > > > > >> just >> > > > > > >>> for host/port, and user's implemented layer can provide URL >> / >> > > proxy >> > > > > > >> address >> > > > > > >>> they want to build on top of it. >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> 2. Re Improving KafkaStreamsInstance interface: >> > > > > > >>> >> > > > > > >>> Users are indeed aware of "TaskId" class which is not part >> of >> > > > > internal >> > > > > > >>> packages and is exposed in PartitionGrouper interface that >> can >> > be >> > > > > > >>> instantiated by the users, which is assigned with input >> topic >> > > > > > partitions. >> > > > > > >>> So we can probably change the APIs as: >> > > > > > >>> >> > > > > > >>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks() >> > > where >> > > > > > >>> TaskMetadata has fields such as taskId, list of assigned >> > > > partitions, >> > > > > > list >> > > > > > >>> of state store names; and HostState can include hostname / >> > port. >> > > > The >> > > > > > port >> > > > > > >>> is the listening port of a user-defined listener that users >> > > provide >> > > > > to >> > > > > > >>> listen for queries (e.g., using REST APIs). >> > > > > > >>> >> > > > > > >>> Map<HostState, Set<TaskMetadata>> >> > > > > KafkaStreams.getTasksWithStore(String >> > > > > > >> /* >> > > > > > >>> storeName */) would return only the hosts and their assigned >> > > tasks >> > > > if >> > > > > > at >> > > > > > >>> least one of the tasks include the given store name. >> > > > > > >>> >> > > > > > >>> Map<HostState, Set<TaskMetadata>> >> > > > > > KafkaStreams.getTaskWithStoreAndKey(Key >> > > > > > >>> k, String /* storeName */, StreamPartitioner partitioner) >> would >> > > > > return >> > > > > > >> only >> > > > > > >>> the host and their assigned task if the store with the store >> > name >> > > > > has a >> > > > > > >>> particular key, according to the partitioner behavior. >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> Guozhang >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede < >> > > n...@confluent.io >> > > > > >> > > > > > >> wrote: >> > > > > > >>> >> > > > > > >>>> Few thoughts that became apparent after observing example >> code >> > > of >> > > > > what >> > > > > > >> an >> > > > > > >>>> application architecture and code might look like with >> these >> > > > > changes. >> > > > > > >>>> Apologize for the late realization hence. >> > > > > > >>>> >> > > > > > >>>> 1. "user.endpoint" will be very differently defined for >> > > respective >> > > > > > >>>> applications. I don't think Kafka Streams should >> generalize to >> > > > > accept >> > > > > > >> any >> > > > > > >>>> connection URL as we expect to only expose metadata >> expressed >> > as >> > > > > > >> HostInfo >> > > > > > >>>> (which is defined by host & port) and hence need to >> interpret >> > > the >> > > > > > >>>> "user.endpoint" as host & port. Applications will have >> their >> > own >> > > > > > >> endpoint >> > > > > > >>>> configs that will take many forms and they will be >> responsible >> > > for >> > > > > > >>> parsing >> > > > > > >>>> out host and port and configuring Kafka Streams >> accordingly. >> > > > > > >>>> >> > > > > > >>>> If we are in fact limiting to host and port, I wonder if we >> > > should >> > > > > > >> change >> > > > > > >>>> the name of "user.endpoint" into something more specific. >> We >> > > have >> > > > > > >> clients >> > > > > > >>>> expose host/port pairs as "bootstrap.servers". Should this >> be >> > > > > > >>>> "application.server"? >> > > > > > >>>> >> > > > > > >>>> 2. I don't think we should expose another abstraction >> called >> > > > > > >>>> KafkaStreamsInstance to the user. This is related to the >> > > > discussion >> > > > > of >> > > > > > >>> the >> > > > > > >>>> right abstraction that we want to expose to an application. >> > The >> > > > > > >>> abstraction >> > > > > > >>>> discussion itself should probably be part of the KIP >> itself, >> > let >> > > > me >> > > > > > >> give >> > > > > > >>> a >> > > > > > >>>> quick summary of my thoughts here: >> > > > > > >>>> 1. The person implementing an application using Queryable >> > State >> > > > has >> > > > > > >>> likely >> > > > > > >>>> already made some choices for the service layer–a REST >> > > framework, >> > > > > > >> Thrift, >> > > > > > >>>> or whatever. We don't really want to add another RPC >> framework >> > > to >> > > > > this >> > > > > > >>> mix, >> > > > > > >>>> nor do we want to try to make Kafka's RPC mechanism general >> > > > purpose. >> > > > > > >>>> 2. Likewise, it should be clear that the API you want to >> > expose >> > > to >> > > > > the >> > > > > > >>>> front-end/client service is not necessarily the API you'd >> need >> > > > > > >> internally >> > > > > > >>>> as there may be additional filtering/processing in the >> router. >> > > > > > >>>> >> > > > > > >>>> Given these constraints, what we prefer to add is a fairly >> > > > low-level >> > > > > > >>>> "toolbox" that would let you do anything you want, but >> > requires >> > > to >> > > > > > >> route >> > > > > > >>>> and perform any aggregation or processing yourself. This >> > pattern >> > > > is >> > > > > > >>>> not recommended for all kinds of services/apps, but there >> are >> > > > > > >> definitely >> > > > > > >>> a >> > > > > > >>>> category of things where it is a big win and other advanced >> > > > > > >> applications >> > > > > > >>>> are out-of-scope. >> > > > > > >>>> >> > > > > > >>>> The APIs we expose should take the following things into >> > > > > > consideration: >> > > > > > >>>> 1. Make it clear to the user that they will do the routing, >> > > > > > >> aggregation, >> > > > > > >>>> processing themselves. So the bare minimum that we want to >> > > expose >> > > > is >> > > > > > >>> store >> > > > > > >>>> and partition metadata per application server identified by >> > the >> > > > host >> > > > > > >> and >> > > > > > >>>> port. >> > > > > > >>>> 2. Ensure that the API exposes abstractions that are known >> to >> > > the >> > > > > user >> > > > > > >> or >> > > > > > >>>> are intuitive to the user. >> > > > > > >>>> 3. Avoid exposing internal objects or implementation >> details >> > to >> > > > the >> > > > > > >> user. >> > > > > > >>>> >> > > > > > >>>> So tying all this into answering the question of what we >> > should >> > > > > expose >> > > > > > >>>> through the APIs - >> > > > > > >>>> >> > > > > > >>>> In Kafka Streams, the user is aware of the concept of tasks >> > and >> > > > > > >>> partitions >> > > > > > >>>> since the application scales with the number of partitions >> and >> > > > tasks >> > > > > > >> are >> > > > > > >>>> the construct for logical parallelism. The user is also >> aware >> > of >> > > > the >> > > > > > >>>> concept of state stores though until now they were not user >> > > > > > accessible. >> > > > > > >>>> With Queryable State, the bare minimum abstractions that we >> > need >> > > > to >> > > > > > >>> expose >> > > > > > >>>> are state stores and the location of state store >> partitions. >> > > > > > >>>> >> > > > > > >>>> For exposing the state stores, the getStore() APIs look >> good >> > > but I >> > > > > > >> think >> > > > > > >>>> for locating the state store partitions, we should go back >> to >> > > the >> > > > > > >>> original >> > > > > > >>>> proposal of simply exposing some sort of >> > getPartitionMetadata() >> > > > that >> > > > > > >>>> returns a PartitionMetadata or TaskMetadata object keyed by >> > > > > HostInfo. >> > > > > > >>>> >> > > > > > >>>> The application will convert the HostInfo (host and port) >> into >> > > > some >> > > > > > >>>> connection URL to talk to the other app instances via its >> own >> > > RPC >> > > > > > >>> mechanism >> > > > > > >>>> depending on whether it needs to scatter-gather or just >> query. >> > > The >> > > > > > >>>> application will know how a key maps to a partition and >> > through >> > > > > > >>>> PartitionMetadata it will know how to locate the server >> that >> > > hosts >> > > > > the >> > > > > > >>>> store that has the partition hosting that key. >> > > > > > >>>> >> > > > > > >>>> On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll < >> > > > mich...@confluent.io> >> > > > > > >>> wrote: >> > > > > > >>>> >> > > > > > >>>>> Addendum in case my previous email wasn't clear: >> > > > > > >>>>> >> > > > > > >>>>>> So for any given instance of a streams application there >> > will >> > > > > never >> > > > > > >>> be >> > > > > > >>>>> both a v1 and v2 alive at the same time >> > > > > > >>>>> >> > > > > > >>>>> That's right. But the current live instance will be able >> to >> > > tell >> > > > > > >> other >> > > > > > >>>>> instances, via its endpoint setting, whether it wants to >> be >> > > > > contacted >> > > > > > >>> at >> > > > > > >>>> v1 >> > > > > > >>>>> or at v2. The other instances can't guess that. Think: >> if >> > an >> > > > > older >> > > > > > >>>>> instance would manually compose the "rest" of an endpoint >> > URI, >> > > > > having >> > > > > > >>>> only >> > > > > > >>>>> the host and port from the endpoint setting, it might not >> > know >> > > > that >> > > > > > >> the >> > > > > > >>>> new >> > > > > > >>>>> instances have a different endpoint suffix, for example). >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll < >> > > > mich...@confluent.io >> > > > > > >> > > > > > >>>> wrote: >> > > > > > >>>>> >> > > > > > >>>>>> Damian, >> > > > > > >>>>>> >> > > > > > >>>>>> about the rolling upgrade comment: An instance A will >> > contact >> > > > > > >>> another >> > > > > > >>>>>> instance B by the latter's endpoint, right? So if A has >> no >> > > > > further >> > > > > > >>>>>> information available than B's host and port, then how >> > should >> > > > > > >>> instance >> > > > > > >>>> A >> > > > > > >>>>>> know whether it should call B at /v1/ or at /v2/? I >> agree >> > > that >> > > > my >> > > > > > >>>>>> suggestion isn't foolproof, but it is afaict better than >> the >> > > > > > >>> host:port >> > > > > > >>>>>> approach. >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy < >> > > > damian....@gmail.com> >> > > > > > >>>> wrote: >> > > > > > >>>>>> >> > > > > > >>>>>>> Michael - i'm ok with changing it to a string. Any one >> else >> > > > have >> > > > > a >> > > > > > >>>>> strong >> > > > > > >>>>>>> opinion on this? >> > > > > > >>>>>>> >> > > > > > >>>>>>> FWIW - i don't think it will work fine as is during the >> > > rolling >> > > > > > >>>> upgrade >> > > > > > >>>>>>> scenario as the service that is listening on the port >> needs >> > > to >> > > > be >> > > > > > >>>>> embedded >> > > > > > >>>>>>> within each instance. So for any given instance of a >> > streams >> > > > > > >>>> application >> > > > > > >>>>>>> there will never be both a v1 and v2 alive at the same >> time >> > > > > > >> (unless >> > > > > > >>> of >> > > > > > >>>>>>> course the process didn't shutdown properly, but then >> you >> > > have >> > > > > > >>> another >> > > > > > >>>>>>> problem...). >> > > > > > >>>>>>> >> > > > > > >>>>>>> On Fri, 8 Jul 2016 at 15:26 Michael Noll < >> > > mich...@confluent.io >> > > > > >> > > > > > >>>> wrote: >> > > > > > >>>>>>> >> > > > > > >>>>>>>> I have one further comment about >> > > > > > >>>> `StreamsConfig.USER_ENDPOINT_CONFIG`. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> I think we should consider to not restricting the >> value of >> > > > this >> > > > > > >>>>> setting >> > > > > > >>>>>>> to >> > > > > > >>>>>>>> only `host:port` pairs. By design, this setting is >> > > capturing >> > > > > > >>>>>>> user-driven >> > > > > > >>>>>>>> metadata to define an endpoint, so why restrict the >> > > creativity >> > > > > > >> or >> > > > > > >>>>>>>> flexibility of our users? I can imagine, for example, >> > that >> > > > > > >> users >> > > > > > >>>>> would >> > > > > > >>>>>>>> like to set values such as `https://host:port >> > /api/rest/v1/` >> > > > in >> > > > > > >>> this >> > > > > > >>>>>>> field >> > > > > > >>>>>>>> (e.g. being able to distinguish between `.../v1/` and >> > > > `.../v2/` >> > > > > > >>> may >> > > > > > >>>>>>> help in >> > > > > > >>>>>>>> scenarios such as rolling upgrades, where, during the >> > > upgrade, >> > > > > > >>> older >> > > > > > >>>>>>>> instances may need to coexist with newer instances). >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> That said, I don't have a strong opinion here. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> -Michael >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax < >> > > > > > >>>>> matth...@confluent.io> >> > > > > > >>>>>>>> wrote: >> > > > > > >>>>>>>> >> > > > > > >>>>>>>>> +1 >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> On 07/08/2016 11:03 AM, Eno Thereska wrote: >> > > > > > >>>>>>>>>> +1 (non-binding) >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>>> On 7 Jul 2016, at 18:31, Sriram Subramanian < >> > > > > > >>> r...@confluent.io> >> > > > > > >>>>>>> wrote: >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> +1 >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai >> > > > > > >>>>>>> <h...@pinterest.com.invalid >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>>> +1 >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll < >> > > > > > >>>>>>> mich...@confluent.io> >> > > > > > >>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> +1 (non-binding) >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy < >> > > > > > >>>>>>> damian....@gmail.com> >> > > > > > >>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> Thanks Henry - we've updated the KIP with an >> example >> > > and >> > > > > > >>> the >> > > > > > >>>>> new >> > > > > > >>>>>>>>> config >> > > > > > >>>>>>>>>>>>>> parameter required. FWIW the user doesn't >> register a >> > > > > > >>>> listener, >> > > > > > >>>>>>> they >> > > > > > >>>>>>>>>>>>> provide >> > > > > > >>>>>>>>>>>>>> a host:port in config. It is expected they will >> > start >> > > a >> > > > > > >>>>> service >> > > > > > >>>>>>>>> running >> > > > > > >>>>>>>>>>>>> on >> > > > > > >>>>>>>>>>>>>> that host:port that they can use to connect to >> the >> > > > > > >> running >> > > > > > >>>>>>>>> KafkaStreams >> > > > > > >>>>>>>>>>>>>> Instance. >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> Thanks, >> > > > > > >>>>>>>>>>>>>> Damian >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai >> > > > > > >>>>>>> <h...@pinterest.com.invalid> >> > > > > > >>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> It wasn't quite clear to me how the user program >> > > > > > >>> interacts >> > > > > > >>>>> with >> > > > > > >>>>>>>> the >> > > > > > >>>>>>>>>>>>>>> discovery API, especially on the user supplied >> > > listener >> > > > > > >>>> part, >> > > > > > >>>>>>> how >> > > > > > >>>>>>>>>>>> does >> > > > > > >>>>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>> user program supply that listener to >> KafkaStreams >> > and >> > > > > > >> how >> > > > > > >>>>> does >> > > > > > >>>>>>>>>>>>>> KafkaStreams >> > > > > > >>>>>>>>>>>>>>> know which port the user listener is running, >> > maybe a >> > > > > > >>> more >> > > > > > >>>>>>>> complete >> > > > > > >>>>>>>>>>>>>>> end-to-end example including the steps on >> > registering >> > > > > > >> the >> > > > > > >>>>> user >> > > > > > >>>>>>>>>>>> listener >> > > > > > >>>>>>>>>>>>>> and >> > > > > > >>>>>>>>>>>>>>> whether the user listener needs to be involved >> with >> > > > > > >> task >> > > > > > >>>>>>>>>>>> reassignment. >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang < >> > > > > > >>>>>>> wangg...@gmail.com >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> +1 >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy < >> > > > > > >>>>>>>> damian....@gmail.com> >> > > > > > >>>>>>>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> Hi all, >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> I'd like to initiate the voting process for >> > KIP-67 >> > > > > > >>>>>>>>>>>>>>>>> < >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>> >> > > > > > >>>> >> > > > > > >>> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams >> > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> KAFKA-3909 < >> > > > > > >>>>> https://issues.apache.org/jira/browse/KAFKA-3909 >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> is >> > > > > > >>>>>>>>>>>>> the >> > > > > > >>>>>>>>>>>>>>> top >> > > > > > >>>>>>>>>>>>>>>>> level JIRA for this effort. >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> Initial PRs for Step 1 of the process are: >> > > > > > >>>>>>>>>>>>>>>>> Expose State Store Names < >> > > > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1526> >> > > > > > >>>>>>>>>>>>>>> and >> > > > > > >>>>>>>>>>>>>>>>> Query Local State Stores < >> > > > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1565> >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>>> Thanks, >> > > > > > >>>>>>>>>>>>>>>>> Damian >> > > > > > >>>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>>> -- >> > > > > > >>>>>>>>>>>>>>>> -- Guozhang >> > > > > > >>>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> -- >> > > > > > >>>>>>>>>>>>> Best regards, >> > > > > > >>>>>>>>>>>>> Michael Noll >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> *Michael G. Noll | Product Manager | Confluent | >> +1 >> > > > > > >>>>>>>>> 650.453.5860Download >> > > > > > >>>>>>>>>>>>> Apache Kafka and Confluent Platform: >> > > > > > >>>> www.confluent.io/download >> > > > > > >>>>>>>>>>>>> <http://www.confluent.io/download>* >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> -- >> > > > > > >>>>>>>> Best regards, >> > > > > > >>>>>>>> Michael Noll >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1 >> > > > > > >>>>> 650.453.5860Download >> > > > > > >>>>>>>> Apache Kafka and Confluent Platform: >> > > > www.confluent.io/download >> > > > > > >>>>>>>> <http://www.confluent.io/download>* >> > > > > > >>>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> -- >> > > > > > >>>>>> Best regards, >> > > > > > >>>>>> Michael Noll >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> >> > > > > > >>>>>> *Michael G. Noll | Product Manager | Confluent | +1 >> > > > 650.453.5860 >> > > > > > >>>>>> <%2B1%20650.453.5860>Download Apache Kafka and Confluent >> > > > Platform: >> > > > > > >>>>>> www.confluent.io/download < >> http://www.confluent.io/download >> > >* >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> -- >> > > > > > >>>>> Best regards, >> > > > > > >>>>> Michael Noll >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> *Michael G. Noll | Product Manager | Confluent | +1 >> > > > > > >>> 650.453.5860Download >> > > > > > >>>>> Apache Kafka and Confluent Platform: >> > www.confluent.io/download >> > > > > > >>>>> <http://www.confluent.io/download>* >> > > > > > >>>>> >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> -- >> > > > > > >>>> Thanks, >> > > > > > >>>> Neha >> > > > > > >>>> >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> >> > > > > > >>> -- >> > > > > > >>> -- Guozhang >> > > > > > >>> >> > > > > > >> >> > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > Thanks, >> > > > > Neha >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > > >> > >> >