Thanks for the reply. It would be interesting who else is using IQ with or without GlobalKTables and what problems and solutions they have come up with. Best regards Patrik
> Am 18.11.2018 um 20:21 schrieb Matthias J. Sax <matth...@confluent.io>: > > Because each instance needs to consume all data, it's limited by what a > single instance can consume -- a hard bound is the network. Note, > network is shared, so don't take the maximum network speed into account. > Also, it's not the number of unique messaged, but the number of updates > that is important for this. > > >> Just to verify, for this IQ setup (streams app which only builds a single >> table to be queried) we have tried the alternative approach to use a normal >> KTable in combination with a unique application ID per application instance. >> This seemed to work quite well, including faster (parallel) startup etc. >> Is this approach valid or would you expect some pitfalls? > > > I guess, for your use case, this might be ok. There is one difference on > startup: if there is no local state build up, in the GlobalKTable case, > before you can start querying, the GlobalKTable will be fully populated > from the topic. For the KTable case, you can query from the very > beginning on, while data is put into the table. > > Also, for this approach, if you add other processing, this processing > would not be parallelized but duplicated. > > > -Matthias > > > >> On 11/7/18 1:32 AM, Patrik Kleindl wrote: >> Thanks for the response. >> How "low" is the expected low throughput? We are are using GlobalKTables >> for IQ on several Topics, but with single-digit million unique messages and >> usually fewer changes per day. >> >> Just to verify, for this IQ setup (streams app which only builds a single >> table to be queried) we have tried the alternative approach to use a normal >> KTable in combination with a unique application ID per application instance. >> This seemed to work quite well, including faster (parallel) startup etc. >> Is this approach valid or would you expect some pitfalls? >> >> We have not used this approach more because it doesn't not work for global >> stores inside a streams application, but it might be beneficial to split >> that up again. >> >> best regards >> >> Patrik >> >>> On Tue, 6 Nov 2018 at 20:07, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> The topics of global stores are not included by design. >>> >>> The "problem" is, that each instance needs to consume *all* >>> topic-partitions from and thus topis, we thus they cannot be include >>> into the consumer group that would assign each partition to exactly one >>> instance. Hence, an additional consumer is used that uses partition >>> assignment (instead of subscription) and this consumer does not commit >>> any offset to Kafka. >>> >>> Note that global stores are bootstrapped before processing begins >>> though, and are expected to be low throughput topic anyway. >>> >>> >>> -Matthias >>> >>>> On 11/6/18 2:03 AM, Patrik Kleindl wrote: >>>> Hello >>>> >>>> Am I doing something wrong or is it by design that global state stores >>> and >>>> their consumers do not show up under the consumer-groups? >>>> With the consumer group command (and in control center as well) I don't >>> get >>>> any output for the group: >>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup >>>> --describe >>>> Note: This will not show information about old Zookeeper-based consumers. >>>> >>>> If I query for the state I get a response that members are present: >>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup >>>> --describe --state >>>> Note: This will not show information about old Zookeeper-based consumers. >>>> >>>> COORDINATOR (ID) ASSIGNMENT-STRATEGY >>>> STATE #MEMBERS >>>> broker:9092 (1) stream Stable 2 >>>> >>>> This is quite irritating as we cannot see if a global state store has >>>> caught up with a backlog of messages. >>>> >>>> Code to reproduce: >>>> builder.globalTable(TOPIC_NAME, Materialized >>>> .<String, String, KeyValueStore<Bytes, >>> byte[]>>as(STORENAME) >>>> .withKeySerde(Serdes.String()) >>>> .withValueSerde(Serdes.String())); >>>> >>>> Nothing fancy. >>>> >>>> Logs: >>>> 2018-11-05 21:25:56 INFO AbstractCoordinator:442 - (Re-)joining group >>>> 2018-11-05 21:25:56 INFO StreamPartitionAssignor:481 - Assigned tasks to >>>> clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([]) >>>> standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) >>>> prevAssignedTasks: ([]) capacity: 1]}. >>>> 2018-11-05 21:25:56 WARN ConsumerCoordinator:376 - The following >>>> subscribed topics are not assigned to any members: [storetopic] >>>> 2018-11-05 21:25:56 INFO AbstractCoordinator:409 - Successfully joined >>>> group with generation 3 >>>> 2018-11-05 21:25:56 INFO ConsumerCoordinator:256 - Setting newly >>> assigned >>>> partitions [] >>>> >>>> The store works after this, but it is not shown. >>>> >>>> Any input is appreciated >>>> >>>> best regards >>>> >>>> Patrik >>>> >>>> PS: The customer will forward this to the Confluent support too, but I'm >>>> asking here for public visibility >>>> >>> >>> >> >