Yes, you get compile time errors

On Wed, 13 Jul 2016 at 16:22 Damian Guy <damian....@gmail.com> wrote:

> You wont get a runtime error as you wouldn't find a store of that type.
> The API would return null
>
> On Wed, 13 Jul 2016 at 16:22 Jay Kreps <j...@confluent.io> wrote:
>
>> But if "my-store" is not of type MyStoreType don't you still get a run
>> time
>> error that in effect is the same as the class cast would be? Basically the
>> question I'm asking is whether this added complexity is actually moving
>> runtime errors to compile time errors.
>>
>> -Jay
>>
>> On Wed, Jul 13, 2016 at 4:16 PM, Damian Guy <damian....@gmail.com> wrote:
>>
>> > You create your custom Store, i.e,:
>> >
>> > /**
>> >  * An interface your custom store provides
>> >  * @param <K>
>> >  * @param <V>
>> >  */
>> > interface MyStoreType<K,V> {
>> >     V get(K key);
>> >     void put(K key, V value);
>> > }
>> >
>> > /**
>> >  * Implement your store
>> >  * @param <K>
>> >  * @param <V>
>> >  */
>> > public class MyStoreImpl<K,V> implements StateStore, MyStoreType<K,V> {
>> >     // implementation of the store goes here
>> > }
>> >
>> >
>> > Provide an implementation of QueryableStoreType to find stores that
>> match
>> > your Custom store:
>> >
>> > /**
>> >  * Implement QueryableStoreType to find stores that match your Custom
>> Store
>> >  * @param <K>
>> >  * @param <V>
>> >  */
>> >
>> > public class MyQueryableType<K, V> implements
>> > QueryableStoreType<MyStoreType<K, V>>{
>> >     @Override
>> >     public boolean accepts(final StateStore stateStore) {
>> >         return stateStore instanceof MyQueryableType;
>> >     }
>> >
>> >     @Override
>> >     public MyStoreType<K,V> create(final StateStoreProvider
>> > storeProvider, final String storeName) {
>> >         return new MyCompositeStore<>(storeName, storeProvider);
>> >     }
>> > }
>> >
>> >
>> > Create a composite type to wrap the potentially many underlying
>> instances
>> > of the store, i.e, there will be one per partition
>> >
>> > /**
>> >  * Provide a wrapper over the underlying store instances.
>> >  */
>> > public class MyCompositeStore<K,V> implements MyStoreType<K,V> {
>> >     private final String storeName;
>> >     private final StateStoreProvider provider;
>> >
>> >     public MyCompositeStore(final String storeName, final
>> > StateStoreProvider provider) {
>> >         this.storeName = storeName;
>> >         this.provider = provider;
>> >     }
>> >
>> >     @Override
>> >     public V get(final K key) {
>> >         final List<MyStoreType<K, V>> stores =
>> > provider.getStores(storeName, new MyQueryableType<K,V>());
>> >         // iterate over stores looking for key
>> >     }
>> >
>> >     @Override
>> >     public void put(final K key, V value) {
>> >
>> >     }
>> > }
>> >
>> >
>> > Lookup your new store from KafkaStreams:
>> >
>> > final MyStoreType store = kafkaStreams.store("my-store", new
>> > MyQueryableType<>());
>> >
>> >
>> > So we get type safety and we can constrain the interfaces returned to
>> Read
>> > Only versions (which is what we are doing for KeyValue and Window
>> Stores)
>> >
>> > HTH,
>> > Damian
>> >
>> > On Wed, 13 Jul 2016 at 15:30 Jay Kreps <j...@confluent.io> wrote:
>> >
>> > > But to avoid the cast you introduce a bunch of magic that doesn't
>> really
>> > > bring type safety, right? Or possibly I'm misunderstanding, how do I
>> plug
>> > > in a new store type and get access to it? Can you give the steps for
>> > that?
>> > >
>> > > -Jay
>> > >
>> > > On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > >
>> > > > Personally I think the additional complexity of the introduced "
>> > > > QueryableStoreType" interface is still acceptable from a user's
>> point
>> > of
>> > > > view: this is the only interface we are exposing to users, and other
>> > > > wrappers are all internal classes.
>> > > >
>> > > > Regarding "QueryableStoreTypes", maybe we can consider declaring its
>> > > > "QueryableStoreTypeMatcher" as private instead of public, since
>> > > > "QueryableStoreTypes" is just used as a convenient manner for using
>> > > > library-provided types, like serialization/Serdes.java.
>> > > >
>> > > > With this the only additional interface the library is exposing is "
>> > > > QueryableStoreType", and users optionally can just use
>> > > > "QueryableStoreTypes"
>> > > > to conveniently create library-provided store types.
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede <n...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Damian -- appreciate the example code and you convinced me. Agree
>> > that
>> > > > the
>> > > > > class approach is better and renaming to KafkaStreamsMetadata
>> along
>> > > with
>> > > > > renaming the API methods will address the issues I was referring
>> to.
>> > > > >
>> > > > > One other thing I wanted to get people's thoughts on was the way
>> we
>> > are
>> > > > > proposing to handle different store types. I am sure you guys have
>> > > > thought
>> > > > > about the tradeoffs of using the store wrappers and matchers (
>> > > > > QueryableStoreType) vs just making users cast the returned store
>> to
>> > the
>> > > > > type they would expect to use. That is simple but the obvious
>> > downside
>> > > is
>> > > > > that it is likely to result in exceptions for users that don't
>> know
>> > > what
>> > > > > they are doing.
>> > > > >
>> > > > > In my experience of dealing with apps that would use queriable
>> state,
>> > > it
>> > > > > appears to me that a majority would just use the key value store.
>> > > Partly
>> > > > > because that will suffice and partly because people might just
>> follow
>> > > the
>> > > > > simpler examples we provide that use key-value store. For advanced
>> > > users,
>> > > > > they will be aware of the reason they want to use the windowed
>> store
>> > > and
>> > > > > will know how to cast it. The advantage of the current approach is
>> > that
>> > > > it
>> > > > > is likely more robust and general but involves introduces more
>> > > interfaces
>> > > > > and wrapper code.
>> > > > >
>> > > > > I tend to prefer simplicity to optimize for the general case, but
>> > > curious
>> > > > > to get people's thoughts on this as well.
>> > > > >
>> > > > > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski <j...@jagunet.com>
>> > > wrote:
>> > > > >
>> > > > > > IMO, that makes the most sense.
>> > > > > >
>> > > > > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma <ism...@juma.me.uk>
>> > > wrote:
>> > > > > > >
>> > > > > > > Hi Damian,
>> > > > > > >
>> > > > > > > How about StreamsMetadata instead? The general naming pattern
>> > seems
>> > > > to
>> > > > > > > avoid the `Kafka` prefix for everything outside of
>> `KafkaStreams`
>> > > > > itself.
>> > > > > > >
>> > > > > > > Ismael
>> > > > > > >
>> > > > > > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy <
>> > damian....@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > >> Hi,
>> > > > > > >>
>> > > > > > >> I agree with point 1. application.server is a better name for
>> > the
>> > > > > config
>> > > > > > >> (we'll change this). However, on point 2 I think we should
>> stick
>> > > > > mostly
>> > > > > > >> with what we already have. I've tried both ways of doing this
>> > when
>> > > > > > working
>> > > > > > >> on the JIRA and building examples and I find the current
>> > approach
>> > > > more
>> > > > > > >> intuitive and easier to use than the Map based approach.
>> > > > > > >> However, there is probably a naming issue. We should rename
>> > > > > > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is
>> very
>> > > > > simple,
>> > > > > > >> but provides all the information a developer needs to be
>> able to
>> > > > find
>> > > > > > the
>> > > > > > >> instance(s) of a Streams application that a particular store
>> is
>> > > > > running
>> > > > > > on,
>> > > > > > >> i.e.,
>> > > > > > >>
>> > > > > > >> public class KafkStreamsMetadata {
>> > > > > > >>    private final HostInfo hostInfo;
>> > > > > > >>    private final Set<String> stateStoreNames;
>> > > > > > >>    private final Set<TopicPartition> topicPartitions;
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> So using the API to route to a new host is fairly simple,
>> > > > particularly
>> > > > > > in
>> > > > > > >> the case when you want to find the host for a particular key,
>> > > i.e.,
>> > > > > > >>
>> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
>> > > > > > >> final KafkaStreamsMetadata streamsMetadata =
>> > > > > > >> kafkaStreams.instanceWithKey("word-count", "hello",
>> > > > > > >> Serdes.String().serializer());
>> > > > > > >> http.get("http://"; + streamsMetadata.host() + ":" +
>> > > > > > >> streamsMetadata.port() + "/get/word-count/hello");
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> And if you want to do a scatter gather approach:
>> > > > > > >>
>> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
>> > > > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas
>> =
>> > > > > > >> kafkaStreams.allInstancesWithStore("word-count");
>> > > > > > >> for (KafkaStreamsMetadata streamsMetadata :
>> > > kafkaStreamsMetadatas) {
>> > > > > > >>    http.get("http://"; + streamsMetadata.host() + ":" +
>> > > > > > >> streamsMetadata.port() + "/get/word-count/hello");
>> > > > > > >>    ...
>> > > > > > >> }
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> And if you iterated over all instances:
>> > > > > > >>
>> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
>> > > > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas
>> =
>> > > > > > >> kafkaStreams.allInstances();
>> > > > > > >> for (KafkaStreamsMetadata streamsMetadata :
>> > > kafkaStreamsMetadatas) {
>> > > > > > >>    if
>> > (streamsMetadata.stateStoreNames().contains("word-count")) {
>> > > > > > >>        http.get("http://"; + streamsMetadata.host() + ":" +
>> > > > > > >> streamsMetadata.port() + "/get/word-count/hello");
>> > > > > > >>        ...
>> > > > > > >>    }
>> > > > > > >> }
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> If we were to change this to use Map<HostInfo,
>> > Set<TaskMetadata>>
>> > > > for
>> > > > > > the
>> > > > > > >> most part users would need to iterate over the entry or key
>> set.
>> > > > > > Examples:
>> > > > > > >>
>> > > > > > >> The finding an instance by key is a little odd:
>> > > > > > >>
>> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
>> > > > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
>> > > > > > >> kafkaStreams.instanceWithKey("word-count","hello",
>> > > > > > >> Serdes.String().serializer());
>> > > > > > >> // this is a bit odd as i only expect one:
>> > > > > > >> for (HostInfo hostInfo : streamsMetadata.keySet()) {
>> > > > > > >>    http.get("http://"; + streamsMetadata.host() + ":" +
>> > > > > > >> streamsMetadata.port() + "/get/word-count/hello");
>> > > > > > >> }
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> The scatter/gather by store is fairly similar to the previous
>> > > > example:
>> > > > > > >>
>> > > > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
>> > > > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
>> > > > > > >> kafkaStreams.allInstancesWithStore("word-count");
>> > > > > > >> for(HostInfo hostInfo : streamsMetadata.keySet()) {
>> > > > > > >>    http.get("http://"; + hostInfo.host() + ":" +
>> > hostInfo.port() +
>> > > > > > >> "/get/word-count/hello");
>> > > > > > >>    ...
>> > > > > > >> }
>> > > > > > >>
>> > > > > > >> And iterating over all instances:
>> > > > > > >>
>> > > > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
>> > > > > > >> kafkaStreams.allInstances();
>> > > > > > >> for (Map.Entry<HostInfo, Set<TaskMetadata>> entry :
>> > > > > > >> streamsMetadata.entrySet()) {
>> > > > > > >>    for (TaskMetadata taskMetadata : entry.getValue()) {
>> > > > > > >>        if
>> > (taskMetadata.stateStoreNames().contains("word-count"))
>> > > {
>> > > > > > >>            http.get("http://"; + streamsMetadata.host() +
>> ":" +
>> > > > > > >> streamsMetadata.port() + "/get/word-count/hello");
>> > > > > > >>            ...
>> > > > > > >>        }
>> > > > > > >>    }
>> > > > > > >> }
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> IMO - having a class we return is the better approach as it
>> > nicely
>> > > > > wraps
>> > > > > > >> the related things, i.e, host:port, store names, topic
>> > partitions
>> > > > into
>> > > > > > an
>> > > > > > >> Object that is easy to use. Further we could add some
>> behaviour
>> > to
>> > > > > this
>> > > > > > >> class if we felt it necessary, i.e, hasStore(storeName) etc.
>> > > > > > >>
>> > > > > > >> Anyway, i'm interested in your thoughts.
>> > > > > > >>
>> > > > > > >> Thanks,
>> > > > > > >> Damian
>> > > > > > >>
>> > > > > > >> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang <
>> wangg...@gmail.com>
>> > > > > wrote:
>> > > > > > >>
>> > > > > > >>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>> > > > > > >>>
>> > > > > > >>> I agree with Neha that Kafka Streams can provide the bare
>> > minimum
>> > > > > APIs
>> > > > > > >> just
>> > > > > > >>> for host/port, and user's implemented layer can provide URL
>> /
>> > > proxy
>> > > > > > >> address
>> > > > > > >>> they want to build on top of it.
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >>> 2. Re Improving KafkaStreamsInstance interface:
>> > > > > > >>>
>> > > > > > >>> Users are indeed aware of "TaskId" class which is not part
>> of
>> > > > > internal
>> > > > > > >>> packages and is exposed in PartitionGrouper interface that
>> can
>> > be
>> > > > > > >>> instantiated by the users, which is assigned with input
>> topic
>> > > > > > partitions.
>> > > > > > >>> So we can probably change the APIs as:
>> > > > > > >>>
>> > > > > > >>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks()
>> > > where
>> > > > > > >>> TaskMetadata has fields such as taskId, list of assigned
>> > > > partitions,
>> > > > > > list
>> > > > > > >>> of state store names; and HostState can include hostname /
>> > port.
>> > > > The
>> > > > > > port
>> > > > > > >>> is the listening port of a user-defined listener that users
>> > > provide
>> > > > > to
>> > > > > > >>> listen for queries (e.g., using REST APIs).
>> > > > > > >>>
>> > > > > > >>> Map<HostState, Set<TaskMetadata>>
>> > > > > KafkaStreams.getTasksWithStore(String
>> > > > > > >> /*
>> > > > > > >>> storeName */) would return only the hosts and their assigned
>> > > tasks
>> > > > if
>> > > > > > at
>> > > > > > >>> least one of the tasks include the given store name.
>> > > > > > >>>
>> > > > > > >>> Map<HostState, Set<TaskMetadata>>
>> > > > > > KafkaStreams.getTaskWithStoreAndKey(Key
>> > > > > > >>> k, String /* storeName */, StreamPartitioner partitioner)
>> would
>> > > > > return
>> > > > > > >> only
>> > > > > > >>> the host and their assigned task if the store with the store
>> > name
>> > > > > has a
>> > > > > > >>> particular key, according to the partitioner behavior.
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >>> Guozhang
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >>> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede <
>> > > n...@confluent.io
>> > > > >
>> > > > > > >> wrote:
>> > > > > > >>>
>> > > > > > >>>> Few thoughts that became apparent after observing example
>> code
>> > > of
>> > > > > what
>> > > > > > >> an
>> > > > > > >>>> application architecture and code might look like with
>> these
>> > > > > changes.
>> > > > > > >>>> Apologize for the late realization hence.
>> > > > > > >>>>
>> > > > > > >>>> 1. "user.endpoint" will be very differently defined for
>> > > respective
>> > > > > > >>>> applications. I don't think Kafka Streams should
>> generalize to
>> > > > > accept
>> > > > > > >> any
>> > > > > > >>>> connection URL as we expect to only expose metadata
>> expressed
>> > as
>> > > > > > >> HostInfo
>> > > > > > >>>> (which is defined by host & port) and hence need to
>> interpret
>> > > the
>> > > > > > >>>> "user.endpoint" as host & port. Applications will have
>> their
>> > own
>> > > > > > >> endpoint
>> > > > > > >>>> configs that will take many forms and they will be
>> responsible
>> > > for
>> > > > > > >>> parsing
>> > > > > > >>>> out host and port and configuring Kafka Streams
>> accordingly.
>> > > > > > >>>>
>> > > > > > >>>> If we are in fact limiting to host and port, I wonder if we
>> > > should
>> > > > > > >> change
>> > > > > > >>>> the name of "user.endpoint" into something more specific.
>> We
>> > > have
>> > > > > > >> clients
>> > > > > > >>>> expose host/port pairs as "bootstrap.servers". Should this
>> be
>> > > > > > >>>> "application.server"?
>> > > > > > >>>>
>> > > > > > >>>> 2. I don't think we should expose another abstraction
>> called
>> > > > > > >>>> KafkaStreamsInstance to the user. This is related to the
>> > > > discussion
>> > > > > of
>> > > > > > >>> the
>> > > > > > >>>> right abstraction that we want to expose to an application.
>> > The
>> > > > > > >>> abstraction
>> > > > > > >>>> discussion itself should probably be part of the KIP
>> itself,
>> > let
>> > > > me
>> > > > > > >> give
>> > > > > > >>> a
>> > > > > > >>>> quick summary of my thoughts here:
>> > > > > > >>>> 1. The person implementing an application using Queryable
>> > State
>> > > > has
>> > > > > > >>> likely
>> > > > > > >>>> already made some choices for the service layer–a REST
>> > > framework,
>> > > > > > >> Thrift,
>> > > > > > >>>> or whatever. We don't really want to add another RPC
>> framework
>> > > to
>> > > > > this
>> > > > > > >>> mix,
>> > > > > > >>>> nor do we want to try to make Kafka's RPC mechanism general
>> > > > purpose.
>> > > > > > >>>> 2. Likewise, it should be clear that the API you want to
>> > expose
>> > > to
>> > > > > the
>> > > > > > >>>> front-end/client service is not necessarily the API you'd
>> need
>> > > > > > >> internally
>> > > > > > >>>> as there may be additional filtering/processing in the
>> router.
>> > > > > > >>>>
>> > > > > > >>>> Given these constraints, what we prefer to add is a fairly
>> > > > low-level
>> > > > > > >>>> "toolbox" that would let you do anything you want, but
>> > requires
>> > > to
>> > > > > > >> route
>> > > > > > >>>> and perform any aggregation or processing yourself. This
>> > pattern
>> > > > is
>> > > > > > >>>> not recommended for all kinds of services/apps, but there
>> are
>> > > > > > >> definitely
>> > > > > > >>> a
>> > > > > > >>>> category of things where it is a big win and other advanced
>> > > > > > >> applications
>> > > > > > >>>> are out-of-scope.
>> > > > > > >>>>
>> > > > > > >>>> The APIs we expose should take the following things into
>> > > > > > consideration:
>> > > > > > >>>> 1. Make it clear to the user that they will do the routing,
>> > > > > > >> aggregation,
>> > > > > > >>>> processing themselves. So the bare minimum that we want to
>> > > expose
>> > > > is
>> > > > > > >>> store
>> > > > > > >>>> and partition metadata per application server identified by
>> > the
>> > > > host
>> > > > > > >> and
>> > > > > > >>>> port.
>> > > > > > >>>> 2. Ensure that the API exposes abstractions that are known
>> to
>> > > the
>> > > > > user
>> > > > > > >> or
>> > > > > > >>>> are intuitive to the user.
>> > > > > > >>>> 3. Avoid exposing internal objects or implementation
>> details
>> > to
>> > > > the
>> > > > > > >> user.
>> > > > > > >>>>
>> > > > > > >>>> So tying all this into answering the question of what we
>> > should
>> > > > > expose
>> > > > > > >>>> through the APIs -
>> > > > > > >>>>
>> > > > > > >>>> In Kafka Streams, the user is aware of the concept of tasks
>> > and
>> > > > > > >>> partitions
>> > > > > > >>>> since the application scales with the number of partitions
>> and
>> > > > tasks
>> > > > > > >> are
>> > > > > > >>>> the construct for logical parallelism. The user is also
>> aware
>> > of
>> > > > the
>> > > > > > >>>> concept of state stores though until now they were not user
>> > > > > > accessible.
>> > > > > > >>>> With Queryable State, the bare minimum abstractions that we
>> > need
>> > > > to
>> > > > > > >>> expose
>> > > > > > >>>> are state stores and the location of state store
>> partitions.
>> > > > > > >>>>
>> > > > > > >>>> For exposing the state stores, the getStore() APIs look
>> good
>> > > but I
>> > > > > > >> think
>> > > > > > >>>> for locating the state store partitions, we should go back
>> to
>> > > the
>> > > > > > >>> original
>> > > > > > >>>> proposal of simply exposing some sort of
>> > getPartitionMetadata()
>> > > > that
>> > > > > > >>>> returns a PartitionMetadata or TaskMetadata object keyed by
>> > > > > HostInfo.
>> > > > > > >>>>
>> > > > > > >>>> The application will convert the HostInfo (host and port)
>> into
>> > > > some
>> > > > > > >>>> connection URL to talk to the other app instances via its
>> own
>> > > RPC
>> > > > > > >>> mechanism
>> > > > > > >>>> depending on whether it needs to scatter-gather or just
>> query.
>> > > The
>> > > > > > >>>> application will know how a key maps to a partition and
>> > through
>> > > > > > >>>> PartitionMetadata it will know how to locate the server
>> that
>> > > hosts
>> > > > > the
>> > > > > > >>>> store that has the partition hosting that key.
>> > > > > > >>>>
>> > > > > > >>>> On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll <
>> > > > mich...@confluent.io>
>> > > > > > >>> wrote:
>> > > > > > >>>>
>> > > > > > >>>>> Addendum in case my previous email wasn't clear:
>> > > > > > >>>>>
>> > > > > > >>>>>> So for any given instance of a streams application there
>> > will
>> > > > > never
>> > > > > > >>> be
>> > > > > > >>>>> both a v1 and v2 alive at the same time
>> > > > > > >>>>>
>> > > > > > >>>>> That's right.  But the current live instance will be able
>> to
>> > > tell
>> > > > > > >> other
>> > > > > > >>>>> instances, via its endpoint setting, whether it wants to
>> be
>> > > > > contacted
>> > > > > > >>> at
>> > > > > > >>>> v1
>> > > > > > >>>>> or at v2.  The other instances can't guess that.  Think:
>> if
>> > an
>> > > > > older
>> > > > > > >>>>> instance would manually compose the "rest" of an endpoint
>> > URI,
>> > > > > having
>> > > > > > >>>> only
>> > > > > > >>>>> the host and port from the endpoint setting, it might not
>> > know
>> > > > that
>> > > > > > >> the
>> > > > > > >>>> new
>> > > > > > >>>>> instances have a different endpoint suffix, for example).
>> > > > > > >>>>>
>> > > > > > >>>>>
>> > > > > > >>>>> On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll <
>> > > > mich...@confluent.io
>> > > > > >
>> > > > > > >>>> wrote:
>> > > > > > >>>>>
>> > > > > > >>>>>> Damian,
>> > > > > > >>>>>>
>> > > > > > >>>>>> about the rolling upgrade comment:  An instance A will
>> > contact
>> > > > > > >>> another
>> > > > > > >>>>>> instance B by the latter's endpoint, right?  So if A has
>> no
>> > > > > further
>> > > > > > >>>>>> information available than B's host and port, then how
>> > should
>> > > > > > >>> instance
>> > > > > > >>>> A
>> > > > > > >>>>>> know whether it should call B at /v1/ or at /v2/?  I
>> agree
>> > > that
>> > > > my
>> > > > > > >>>>>> suggestion isn't foolproof, but it is afaict better than
>> the
>> > > > > > >>> host:port
>> > > > > > >>>>>> approach.
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy <
>> > > > damian....@gmail.com>
>> > > > > > >>>> wrote:
>> > > > > > >>>>>>
>> > > > > > >>>>>>> Michael - i'm ok with changing it to a string. Any one
>> else
>> > > > have
>> > > > > a
>> > > > > > >>>>> strong
>> > > > > > >>>>>>> opinion on this?
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> FWIW - i don't think it will work fine as is during the
>> > > rolling
>> > > > > > >>>> upgrade
>> > > > > > >>>>>>> scenario as the service that is listening on the port
>> needs
>> > > to
>> > > > be
>> > > > > > >>>>> embedded
>> > > > > > >>>>>>> within each instance. So for any given instance of a
>> > streams
>> > > > > > >>>> application
>> > > > > > >>>>>>> there will never be both a v1 and v2 alive at the same
>> time
>> > > > > > >> (unless
>> > > > > > >>> of
>> > > > > > >>>>>>> course the process didn't shutdown properly, but then
>> you
>> > > have
>> > > > > > >>> another
>> > > > > > >>>>>>> problem...).
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> On Fri, 8 Jul 2016 at 15:26 Michael Noll <
>> > > mich...@confluent.io
>> > > > >
>> > > > > > >>>> wrote:
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>> I have one further comment about
>> > > > > > >>>> `StreamsConfig.USER_ENDPOINT_CONFIG`.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> I think we should consider to not restricting the
>> value of
>> > > > this
>> > > > > > >>>>> setting
>> > > > > > >>>>>>> to
>> > > > > > >>>>>>>> only `host:port` pairs.  By design, this setting is
>> > > capturing
>> > > > > > >>>>>>> user-driven
>> > > > > > >>>>>>>> metadata to define an endpoint, so why restrict the
>> > > creativity
>> > > > > > >> or
>> > > > > > >>>>>>>> flexibility of our users?  I can imagine, for example,
>> > that
>> > > > > > >> users
>> > > > > > >>>>> would
>> > > > > > >>>>>>>> like to set values such as `https://host:port
>> > /api/rest/v1/`
>> > > > in
>> > > > > > >>> this
>> > > > > > >>>>>>> field
>> > > > > > >>>>>>>> (e.g. being able to distinguish between `.../v1/` and
>> > > > `.../v2/`
>> > > > > > >>> may
>> > > > > > >>>>>>> help in
>> > > > > > >>>>>>>> scenarios such as rolling upgrades, where, during the
>> > > upgrade,
>> > > > > > >>> older
>> > > > > > >>>>>>>> instances may need to coexist with newer instances).
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> That said, I don't have a strong opinion here.
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> -Michael
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax <
>> > > > > > >>>>> matth...@confluent.io>
>> > > > > > >>>>>>>> wrote:
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>> +1
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>> On 07/08/2016 11:03 AM, Eno Thereska wrote:
>> > > > > > >>>>>>>>>> +1 (non-binding)
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>>> On 7 Jul 2016, at 18:31, Sriram Subramanian <
>> > > > > > >>> r...@confluent.io>
>> > > > > > >>>>>>> wrote:
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>> +1
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai
>> > > > > > >>>>>>> <h...@pinterest.com.invalid
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> +1
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll <
>> > > > > > >>>>>>> mich...@confluent.io>
>> > > > > > >>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> +1 (non-binding)
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy <
>> > > > > > >>>>>>> damian....@gmail.com>
>> > > > > > >>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>> Thanks Henry - we've updated the KIP with an
>> example
>> > > and
>> > > > > > >>> the
>> > > > > > >>>>> new
>> > > > > > >>>>>>>>> config
>> > > > > > >>>>>>>>>>>>>> parameter required. FWIW the user doesn't
>> register a
>> > > > > > >>>> listener,
>> > > > > > >>>>>>> they
>> > > > > > >>>>>>>>>>>>> provide
>> > > > > > >>>>>>>>>>>>>> a host:port in config. It is expected they will
>> > start
>> > > a
>> > > > > > >>>>> service
>> > > > > > >>>>>>>>> running
>> > > > > > >>>>>>>>>>>>> on
>> > > > > > >>>>>>>>>>>>>> that host:port that they can use to connect to
>> the
>> > > > > > >> running
>> > > > > > >>>>>>>>> KafkaStreams
>> > > > > > >>>>>>>>>>>>>> Instance.
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>> Thanks,
>> > > > > > >>>>>>>>>>>>>> Damian
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai
>> > > > > > >>>>>>> <h...@pinterest.com.invalid>
>> > > > > > >>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> It wasn't quite clear to me how the user program
>> > > > > > >>> interacts
>> > > > > > >>>>> with
>> > > > > > >>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>> discovery API, especially on the user supplied
>> > > listener
>> > > > > > >>>> part,
>> > > > > > >>>>>>> how
>> > > > > > >>>>>>>>>>>> does
>> > > > > > >>>>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>> user program supply that listener to
>> KafkaStreams
>> > and
>> > > > > > >> how
>> > > > > > >>>>> does
>> > > > > > >>>>>>>>>>>>>> KafkaStreams
>> > > > > > >>>>>>>>>>>>>>> know which port the user listener is running,
>> > maybe a
>> > > > > > >>> more
>> > > > > > >>>>>>>> complete
>> > > > > > >>>>>>>>>>>>>>> end-to-end example including the steps on
>> > registering
>> > > > > > >> the
>> > > > > > >>>>> user
>> > > > > > >>>>>>>>>>>> listener
>> > > > > > >>>>>>>>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>> whether the user listener needs to be involved
>> with
>> > > > > > >> task
>> > > > > > >>>>>>>>>>>> reassignment.
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang <
>> > > > > > >>>>>>> wangg...@gmail.com
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> +1
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy <
>> > > > > > >>>>>>>> damian....@gmail.com>
>> > > > > > >>>>>>>>>>>>>>> wrote:
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> Hi all,
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> I'd like to initiate the voting process for
>> > KIP-67
>> > > > > > >>>>>>>>>>>>>>>>> <
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>
>> > > > > > >>>>
>> > > > > > >>>
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>> > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> KAFKA-3909 <
>> > > > > > >>>>> https://issues.apache.org/jira/browse/KAFKA-3909
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> is
>> > > > > > >>>>>>>>>>>>> the
>> > > > > > >>>>>>>>>>>>>>> top
>> > > > > > >>>>>>>>>>>>>>>>> level JIRA for this effort.
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> Initial PRs for Step 1 of the process are:
>> > > > > > >>>>>>>>>>>>>>>>> Expose State Store Names <
>> > > > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1526>
>> > > > > > >>>>>>>>>>>>>>> and
>> > > > > > >>>>>>>>>>>>>>>>> Query Local State Stores <
>> > > > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1565>
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>> Thanks,
>> > > > > > >>>>>>>>>>>>>>>>> Damian
>> > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>> --
>> > > > > > >>>>>>>>>>>>>>>> -- Guozhang
>> > > > > > >>>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> --
>> > > > > > >>>>>>>>>>>>> Best regards,
>> > > > > > >>>>>>>>>>>>> Michael Noll
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>> *Michael G. Noll | Product Manager | Confluent |
>> +1
>> > > > > > >>>>>>>>> 650.453.5860Download
>> > > > > > >>>>>>>>>>>>> Apache Kafka and Confluent Platform:
>> > > > > > >>>> www.confluent.io/download
>> > > > > > >>>>>>>>>>>>> <http://www.confluent.io/download>*
>> > > > > > >>>>>>>>>>>>>
>> > > > > > >>>>>>>>>>>>
>> > > > > > >>>>>>>>>>
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> --
>> > > > > > >>>>>>>> Best regards,
>> > > > > > >>>>>>>> Michael Noll
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1
>> > > > > > >>>>> 650.453.5860Download
>> > > > > > >>>>>>>> Apache Kafka and Confluent Platform:
>> > > > www.confluent.io/download
>> > > > > > >>>>>>>> <http://www.confluent.io/download>*
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> --
>> > > > > > >>>>>> Best regards,
>> > > > > > >>>>>> Michael Noll
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> *Michael G. Noll | Product Manager | Confluent | +1
>> > > > 650.453.5860
>> > > > > > >>>>>> <%2B1%20650.453.5860>Download Apache Kafka and Confluent
>> > > > Platform:
>> > > > > > >>>>>> www.confluent.io/download <
>> http://www.confluent.io/download
>> > >*
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>>>
>> > > > > > >>>>>
>> > > > > > >>>>> --
>> > > > > > >>>>> Best regards,
>> > > > > > >>>>> Michael Noll
>> > > > > > >>>>>
>> > > > > > >>>>>
>> > > > > > >>>>>
>> > > > > > >>>>> *Michael G. Noll | Product Manager | Confluent | +1
>> > > > > > >>> 650.453.5860Download
>> > > > > > >>>>> Apache Kafka and Confluent Platform:
>> > www.confluent.io/download
>> > > > > > >>>>> <http://www.confluent.io/download>*
>> > > > > > >>>>>
>> > > > > > >>>>
>> > > > > > >>>>
>> > > > > > >>>>
>> > > > > > >>>> --
>> > > > > > >>>> Thanks,
>> > > > > > >>>> Neha
>> > > > > > >>>>
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >>>
>> > > > > > >>> --
>> > > > > > >>> -- Guozhang
>> > > > > > >>>
>> > > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Thanks,
>> > > > > Neha
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> > >
>> >
>>
>

Reply via email to