I'm in agreement with Gouzhang. Further, from a "Developer Experience" PoV IMO we should avoid forcing developers to cast - it doesn't make for nice and easy to use API, introduces possible runtime errors due to invalid casts, developers need to know what they are casting to.
On Wed, 13 Jul 2016 at 10:47 Guozhang Wang <wangg...@gmail.com> wrote: > Personally I think the additional complexity of the introduced " > QueryableStoreType" interface is still acceptable from a user's point of > view: this is the only interface we are exposing to users, and other > wrappers are all internal classes. > > Regarding "QueryableStoreTypes", maybe we can consider declaring its > "QueryableStoreTypeMatcher" as private instead of public, since > "QueryableStoreTypes" is just used as a convenient manner for using > library-provided types, like serialization/Serdes.java. > > With this the only additional interface the library is exposing is " > QueryableStoreType", and users optionally can just use > "QueryableStoreTypes" > to conveniently create library-provided store types. > > > Guozhang > > > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede <n...@confluent.io> wrote: > > > Damian -- appreciate the example code and you convinced me. Agree that > the > > class approach is better and renaming to KafkaStreamsMetadata along with > > renaming the API methods will address the issues I was referring to. > > > > One other thing I wanted to get people's thoughts on was the way we are > > proposing to handle different store types. I am sure you guys have > thought > > about the tradeoffs of using the store wrappers and matchers ( > > QueryableStoreType) vs just making users cast the returned store to the > > type they would expect to use. That is simple but the obvious downside is > > that it is likely to result in exceptions for users that don't know what > > they are doing. > > > > In my experience of dealing with apps that would use queriable state, it > > appears to me that a majority would just use the key value store. Partly > > because that will suffice and partly because people might just follow the > > simpler examples we provide that use key-value store. For advanced users, > > they will be aware of the reason they want to use the windowed store and > > will know how to cast it. The advantage of the current approach is that > it > > is likely more robust and general but involves introduces more interfaces > > and wrapper code. > > > > I tend to prefer simplicity to optimize for the general case, but curious > > to get people's thoughts on this as well. > > > > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski <j...@jagunet.com> wrote: > > > > > IMO, that makes the most sense. > > > > > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma <ism...@juma.me.uk> wrote: > > > > > > > > Hi Damian, > > > > > > > > How about StreamsMetadata instead? The general naming pattern seems > to > > > > avoid the `Kafka` prefix for everything outside of `KafkaStreams` > > itself. > > > > > > > > Ismael > > > > > > > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > >> Hi, > > > >> > > > >> I agree with point 1. application.server is a better name for the > > config > > > >> (we'll change this). However, on point 2 I think we should stick > > mostly > > > >> with what we already have. I've tried both ways of doing this when > > > working > > > >> on the JIRA and building examples and I find the current approach > more > > > >> intuitive and easier to use than the Map based approach. > > > >> However, there is probably a naming issue. We should rename > > > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very > > simple, > > > >> but provides all the information a developer needs to be able to > find > > > the > > > >> instance(s) of a Streams application that a particular store is > > running > > > on, > > > >> i.e., > > > >> > > > >> public class KafkStreamsMetadata { > > > >> private final HostInfo hostInfo; > > > >> private final Set<String> stateStoreNames; > > > >> private final Set<TopicPartition> topicPartitions; > > > >> > > > >> > > > >> So using the API to route to a new host is fairly simple, > particularly > > > in > > > >> the case when you want to find the host for a particular key, i.e., > > > >> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); > > > >> final KafkaStreamsMetadata streamsMetadata = > > > >> kafkaStreams.instanceWithKey("word-count", "hello", > > > >> Serdes.String().serializer()); > > > >> http.get("http://"; + streamsMetadata.host() + ":" + > > > >> streamsMetadata.port() + "/get/word-count/hello"); > > > >> > > > >> > > > >> And if you want to do a scatter gather approach: > > > >> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas = > > > >> kafkaStreams.allInstancesWithStore("word-count"); > > > >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) { > > > >> http.get("http://"; + streamsMetadata.host() + ":" + > > > >> streamsMetadata.port() + "/get/word-count/hello"); > > > >> ... > > > >> } > > > >> > > > >> > > > >> And if you iterated over all instances: > > > >> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas = > > > >> kafkaStreams.allInstances(); > > > >> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) { > > > >> if (streamsMetadata.stateStoreNames().contains("word-count")) { > > > >> http.get("http://"; + streamsMetadata.host() + ":" + > > > >> streamsMetadata.port() + "/get/word-count/hello"); > > > >> ... > > > >> } > > > >> } > > > >> > > > >> > > > >> If we were to change this to use Map<HostInfo, Set<TaskMetadata>> > for > > > the > > > >> most part users would need to iterate over the entry or key set. > > > Examples: > > > >> > > > >> The finding an instance by key is a little odd: > > > >> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = > > > >> kafkaStreams.instanceWithKey("word-count","hello", > > > >> Serdes.String().serializer()); > > > >> // this is a bit odd as i only expect one: > > > >> for (HostInfo hostInfo : streamsMetadata.keySet()) { > > > >> http.get("http://"; + streamsMetadata.host() + ":" + > > > >> streamsMetadata.port() + "/get/word-count/hello"); > > > >> } > > > >> > > > >> > > > >> The scatter/gather by store is fairly similar to the previous > example: > > > >> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams(); > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = > > > >> kafkaStreams.allInstancesWithStore("word-count"); > > > >> for(HostInfo hostInfo : streamsMetadata.keySet()) { > > > >> http.get("http://"; + hostInfo.host() + ":" + hostInfo.port() + > > > >> "/get/word-count/hello"); > > > >> ... > > > >> } > > > >> > > > >> And iterating over all instances: > > > >> > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = > > > >> kafkaStreams.allInstances(); > > > >> for (Map.Entry<HostInfo, Set<TaskMetadata>> entry : > > > >> streamsMetadata.entrySet()) { > > > >> for (TaskMetadata taskMetadata : entry.getValue()) { > > > >> if (taskMetadata.stateStoreNames().contains("word-count")) { > > > >> http.get("http://"; + streamsMetadata.host() + ":" + > > > >> streamsMetadata.port() + "/get/word-count/hello"); > > > >> ... > > > >> } > > > >> } > > > >> } > > > >> > > > >> > > > >> IMO - having a class we return is the better approach as it nicely > > wraps > > > >> the related things, i.e, host:port, store names, topic partitions > into > > > an > > > >> Object that is easy to use. Further we could add some behaviour to > > this > > > >> class if we felt it necessary, i.e, hasStore(storeName) etc. > > > >> > > > >> Anyway, i'm interested in your thoughts. > > > >> > > > >> Thanks, > > > >> Damian > > > >> > > > >> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang <wangg...@gmail.com> > > wrote: > > > >> > > > >>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG: > > > >>> > > > >>> I agree with Neha that Kafka Streams can provide the bare minimum > > APIs > > > >> just > > > >>> for host/port, and user's implemented layer can provide URL / proxy > > > >> address > > > >>> they want to build on top of it. > > > >>> > > > >>> > > > >>> 2. Re Improving KafkaStreamsInstance interface: > > > >>> > > > >>> Users are indeed aware of "TaskId" class which is not part of > > internal > > > >>> packages and is exposed in PartitionGrouper interface that can be > > > >>> instantiated by the users, which is assigned with input topic > > > partitions. > > > >>> So we can probably change the APIs as: > > > >>> > > > >>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks() where > > > >>> TaskMetadata has fields such as taskId, list of assigned > partitions, > > > list > > > >>> of state store names; and HostState can include hostname / port. > The > > > port > > > >>> is the listening port of a user-defined listener that users provide > > to > > > >>> listen for queries (e.g., using REST APIs). > > > >>> > > > >>> Map<HostState, Set<TaskMetadata>> > > KafkaStreams.getTasksWithStore(String > > > >> /* > > > >>> storeName */) would return only the hosts and their assigned tasks > if > > > at > > > >>> least one of the tasks include the given store name. > > > >>> > > > >>> Map<HostState, Set<TaskMetadata>> > > > KafkaStreams.getTaskWithStoreAndKey(Key > > > >>> k, String /* storeName */, StreamPartitioner partitioner) would > > return > > > >> only > > > >>> the host and their assigned task if the store with the store name > > has a > > > >>> particular key, according to the partitioner behavior. > > > >>> > > > >>> > > > >>> > > > >>> Guozhang > > > >>> > > > >>> > > > >>> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede <n...@confluent.io > > > > > >> wrote: > > > >>> > > > >>>> Few thoughts that became apparent after observing example code of > > what > > > >> an > > > >>>> application architecture and code might look like with these > > changes. > > > >>>> Apologize for the late realization hence. > > > >>>> > > > >>>> 1. "user.endpoint" will be very differently defined for respective > > > >>>> applications. I don't think Kafka Streams should generalize to > > accept > > > >> any > > > >>>> connection URL as we expect to only expose metadata expressed as > > > >> HostInfo > > > >>>> (which is defined by host & port) and hence need to interpret the > > > >>>> "user.endpoint" as host & port. Applications will have their own > > > >> endpoint > > > >>>> configs that will take many forms and they will be responsible for > > > >>> parsing > > > >>>> out host and port and configuring Kafka Streams accordingly. > > > >>>> > > > >>>> If we are in fact limiting to host and port, I wonder if we should > > > >> change > > > >>>> the name of "user.endpoint" into something more specific. We have > > > >> clients > > > >>>> expose host/port pairs as "bootstrap.servers". Should this be > > > >>>> "application.server"? > > > >>>> > > > >>>> 2. I don't think we should expose another abstraction called > > > >>>> KafkaStreamsInstance to the user. This is related to the > discussion > > of > > > >>> the > > > >>>> right abstraction that we want to expose to an application. The > > > >>> abstraction > > > >>>> discussion itself should probably be part of the KIP itself, let > me > > > >> give > > > >>> a > > > >>>> quick summary of my thoughts here: > > > >>>> 1. The person implementing an application using Queryable State > has > > > >>> likely > > > >>>> already made some choices for the service layer–a REST framework, > > > >> Thrift, > > > >>>> or whatever. We don't really want to add another RPC framework to > > this > > > >>> mix, > > > >>>> nor do we want to try to make Kafka's RPC mechanism general > purpose. > > > >>>> 2. Likewise, it should be clear that the API you want to expose to > > the > > > >>>> front-end/client service is not necessarily the API you'd need > > > >> internally > > > >>>> as there may be additional filtering/processing in the router. > > > >>>> > > > >>>> Given these constraints, what we prefer to add is a fairly > low-level > > > >>>> "toolbox" that would let you do anything you want, but requires to > > > >> route > > > >>>> and perform any aggregation or processing yourself. This pattern > is > > > >>>> not recommended for all kinds of services/apps, but there are > > > >> definitely > > > >>> a > > > >>>> category of things where it is a big win and other advanced > > > >> applications > > > >>>> are out-of-scope. > > > >>>> > > > >>>> The APIs we expose should take the following things into > > > consideration: > > > >>>> 1. Make it clear to the user that they will do the routing, > > > >> aggregation, > > > >>>> processing themselves. So the bare minimum that we want to expose > is > > > >>> store > > > >>>> and partition metadata per application server identified by the > host > > > >> and > > > >>>> port. > > > >>>> 2. Ensure that the API exposes abstractions that are known to the > > user > > > >> or > > > >>>> are intuitive to the user. > > > >>>> 3. Avoid exposing internal objects or implementation details to > the > > > >> user. > > > >>>> > > > >>>> So tying all this into answering the question of what we should > > expose > > > >>>> through the APIs - > > > >>>> > > > >>>> In Kafka Streams, the user is aware of the concept of tasks and > > > >>> partitions > > > >>>> since the application scales with the number of partitions and > tasks > > > >> are > > > >>>> the construct for logical parallelism. The user is also aware of > the > > > >>>> concept of state stores though until now they were not user > > > accessible. > > > >>>> With Queryable State, the bare minimum abstractions that we need > to > > > >>> expose > > > >>>> are state stores and the location of state store partitions. > > > >>>> > > > >>>> For exposing the state stores, the getStore() APIs look good but I > > > >> think > > > >>>> for locating the state store partitions, we should go back to the > > > >>> original > > > >>>> proposal of simply exposing some sort of getPartitionMetadata() > that > > > >>>> returns a PartitionMetadata or TaskMetadata object keyed by > > HostInfo. > > > >>>> > > > >>>> The application will convert the HostInfo (host and port) into > some > > > >>>> connection URL to talk to the other app instances via its own RPC > > > >>> mechanism > > > >>>> depending on whether it needs to scatter-gather or just query. The > > > >>>> application will know how a key maps to a partition and through > > > >>>> PartitionMetadata it will know how to locate the server that hosts > > the > > > >>>> store that has the partition hosting that key. > > > >>>> > > > >>>> On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll < > mich...@confluent.io> > > > >>> wrote: > > > >>>> > > > >>>>> Addendum in case my previous email wasn't clear: > > > >>>>> > > > >>>>>> So for any given instance of a streams application there will > > never > > > >>> be > > > >>>>> both a v1 and v2 alive at the same time > > > >>>>> > > > >>>>> That's right. But the current live instance will be able to tell > > > >> other > > > >>>>> instances, via its endpoint setting, whether it wants to be > > contacted > > > >>> at > > > >>>> v1 > > > >>>>> or at v2. The other instances can't guess that. Think: if an > > older > > > >>>>> instance would manually compose the "rest" of an endpoint URI, > > having > > > >>>> only > > > >>>>> the host and port from the endpoint setting, it might not know > that > > > >> the > > > >>>> new > > > >>>>> instances have a different endpoint suffix, for example). > > > >>>>> > > > >>>>> > > > >>>>> On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll < > mich...@confluent.io > > > > > > >>>> wrote: > > > >>>>> > > > >>>>>> Damian, > > > >>>>>> > > > >>>>>> about the rolling upgrade comment: An instance A will contact > > > >>> another > > > >>>>>> instance B by the latter's endpoint, right? So if A has no > > further > > > >>>>>> information available than B's host and port, then how should > > > >>> instance > > > >>>> A > > > >>>>>> know whether it should call B at /v1/ or at /v2/? I agree that > my > > > >>>>>> suggestion isn't foolproof, but it is afaict better than the > > > >>> host:port > > > >>>>>> approach. > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy < > damian....@gmail.com> > > > >>>> wrote: > > > >>>>>> > > > >>>>>>> Michael - i'm ok with changing it to a string. Any one else > have > > a > > > >>>>> strong > > > >>>>>>> opinion on this? > > > >>>>>>> > > > >>>>>>> FWIW - i don't think it will work fine as is during the rolling > > > >>>> upgrade > > > >>>>>>> scenario as the service that is listening on the port needs to > be > > > >>>>> embedded > > > >>>>>>> within each instance. So for any given instance of a streams > > > >>>> application > > > >>>>>>> there will never be both a v1 and v2 alive at the same time > > > >> (unless > > > >>> of > > > >>>>>>> course the process didn't shutdown properly, but then you have > > > >>> another > > > >>>>>>> problem...). > > > >>>>>>> > > > >>>>>>> On Fri, 8 Jul 2016 at 15:26 Michael Noll <mich...@confluent.io > > > > > >>>> wrote: > > > >>>>>>> > > > >>>>>>>> I have one further comment about > > > >>>> `StreamsConfig.USER_ENDPOINT_CONFIG`. > > > >>>>>>>> > > > >>>>>>>> I think we should consider to not restricting the value of > this > > > >>>>> setting > > > >>>>>>> to > > > >>>>>>>> only `host:port` pairs. By design, this setting is capturing > > > >>>>>>> user-driven > > > >>>>>>>> metadata to define an endpoint, so why restrict the creativity > > > >> or > > > >>>>>>>> flexibility of our users? I can imagine, for example, that > > > >> users > > > >>>>> would > > > >>>>>>>> like to set values such as `https://host:port/api/rest/v1/` > in > > > >>> this > > > >>>>>>> field > > > >>>>>>>> (e.g. being able to distinguish between `.../v1/` and > `.../v2/` > > > >>> may > > > >>>>>>> help in > > > >>>>>>>> scenarios such as rolling upgrades, where, during the upgrade, > > > >>> older > > > >>>>>>>> instances may need to coexist with newer instances). > > > >>>>>>>> > > > >>>>>>>> That said, I don't have a strong opinion here. > > > >>>>>>>> > > > >>>>>>>> -Michael > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax < > > > >>>>> matth...@confluent.io> > > > >>>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> +1 > > > >>>>>>>>> > > > >>>>>>>>> On 07/08/2016 11:03 AM, Eno Thereska wrote: > > > >>>>>>>>>> +1 (non-binding) > > > >>>>>>>>>> > > > >>>>>>>>>>> On 7 Jul 2016, at 18:31, Sriram Subramanian < > > > >>> r...@confluent.io> > > > >>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>> +1 > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai > > > >>>>>>> <h...@pinterest.com.invalid > > > >>>>>>>>> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> +1 > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll < > > > >>>>>>> mich...@confluent.io> > > > >>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> +1 (non-binding) > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy < > > > >>>>>>> damian....@gmail.com> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks Henry - we've updated the KIP with an example and > > > >>> the > > > >>>>> new > > > >>>>>>>>> config > > > >>>>>>>>>>>>>> parameter required. FWIW the user doesn't register a > > > >>>> listener, > > > >>>>>>> they > > > >>>>>>>>>>>>> provide > > > >>>>>>>>>>>>>> a host:port in config. It is expected they will start a > > > >>>>> service > > > >>>>>>>>> running > > > >>>>>>>>>>>>> on > > > >>>>>>>>>>>>>> that host:port that they can use to connect to the > > > >> running > > > >>>>>>>>> KafkaStreams > > > >>>>>>>>>>>>>> Instance. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>> Damian > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai > > > >>>>>>> <h...@pinterest.com.invalid> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> It wasn't quite clear to me how the user program > > > >>> interacts > > > >>>>> with > > > >>>>>>>> the > > > >>>>>>>>>>>>>>> discovery API, especially on the user supplied listener > > > >>>> part, > > > >>>>>>> how > > > >>>>>>>>>>>> does > > > >>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> user program supply that listener to KafkaStreams and > > > >> how > > > >>>>> does > > > >>>>>>>>>>>>>> KafkaStreams > > > >>>>>>>>>>>>>>> know which port the user listener is running, maybe a > > > >>> more > > > >>>>>>>> complete > > > >>>>>>>>>>>>>>> end-to-end example including the steps on registering > > > >> the > > > >>>>> user > > > >>>>>>>>>>>> listener > > > >>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>> whether the user listener needs to be involved with > > > >> task > > > >>>>>>>>>>>> reassignment. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang < > > > >>>>>>> wangg...@gmail.com > > > >>>>>>>>> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> +1 > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy < > > > >>>>>>>> damian....@gmail.com> > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> I'd like to initiate the voting process for KIP-67 > > > >>>>>>>>>>>>>>>>> < > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> KAFKA-3909 < > > > >>>>> https://issues.apache.org/jira/browse/KAFKA-3909 > > > >>>>>>>> > > > >>>>>>>> is > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> top > > > >>>>>>>>>>>>>>>>> level JIRA for this effort. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Initial PRs for Step 1 of the process are: > > > >>>>>>>>>>>>>>>>> Expose State Store Names < > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1526> > > > >>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>> Query Local State Stores < > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1565> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>>> Damian > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>>> -- Guozhang > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> -- > > > >>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>> Michael Noll > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1 > > > >>>>>>>>> 650.453.5860Download > > > >>>>>>>>>>>>> Apache Kafka and Confluent Platform: > > > >>>> www.confluent.io/download > > > >>>>>>>>>>>>> <http://www.confluent.io/download>* > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> -- > > > >>>>>>>> Best regards, > > > >>>>>>>> Michael Noll > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1 > > > >>>>> 650.453.5860Download > > > >>>>>>>> Apache Kafka and Confluent Platform: > www.confluent.io/download > > > >>>>>>>> <http://www.confluent.io/download>* > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> -- > > > >>>>>> Best regards, > > > >>>>>> Michael Noll > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860 > > > >>>>>> <%2B1%20650.453.5860>Download Apache Kafka and Confluent > Platform: > > > >>>>>> www.confluent.io/download <http://www.confluent.io/download>* > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> -- > > > >>>>> Best regards, > > > >>>>> Michael Noll > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> *Michael G. Noll | Product Manager | Confluent | +1 > > > >>> 650.453.5860Download > > > >>>>> Apache Kafka and Confluent Platform: www.confluent.io/download > > > >>>>> <http://www.confluent.io/download>* > > > >>>>> > > > >>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> Thanks, > > > >>>> Neha > > > >>>> > > > >>> > > > >>> > > > >>> > > > >>> -- > > > >>> -- Guozhang > > > >>> > > > >> > > > > > > > > > > > > -- > > Thanks, > > Neha > > > > > > -- > -- Guozhang >