Re: Kafka streams consumer/producer throttling
Hi Guozhang, I set client id like that: propsWithAppId.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); propsWithAppId.put(StreamsConfig.CLIENT_ID_CONFIG, appId); propsWithAppId.put(ProducerConfig.CLIENT_ID_CONFIG, appId); propsWithAppId.put(ConsumerConfig.CLIENT_ID_CONFIG, appId); propsWithAppId.put(StreamsConfig.producerPrefix(ProducerConfig.CLIENT_ID_CONFIG), appId); propsWithAppId.put(StreamsConfig.consumerPrefix(ConsumerConfig.CLIENT_ID_CONFIG), appId); after I tried to set it in 2 different ways neither of them worked: AdminUtils.changeClientIdConfig(zkUtils, clientId, clientProps); // i know it is deprecated bin/kafka-configs.sh --zookeeper zk-ips --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=1048576' --entity-type clients --entity-name appId I checked the source code of the kstreams. The client id is actually changed by kstreams, that's why setting it does not have any effect. It forms client id as: clientId + thread name + (consumer | producer | restore-consumer) I have not found in the docs if it is possible to use wildcard when setting client id bandwidth threshold. On Sat, 17 Nov 2018 at 01:23, Guozhang Wang wrote: > > Hello Andrey, > > Could you provide a bit more information on how you set the bandwidth based > on client id? Sharing some code snippet would even better for me to > understand your encountered issue. > > Guozhang > > On Fri, Nov 9, 2018 at 7:11 AM Andrey Dyachkov > wrote: > > > Hello, > > > > Could you tell me if there is a way to throttle kafka streams by client id? > > I’ve tried to set bandwidth limit for client id, which is specefied in > > kafka stream config, but it does not work. > > -- > > > > With great enthusiasm, > > Andrey > > > > > -- > -- Guozhang -- Thanks, Andrey
Re: Stream Metrics - Memory Analysis
Done. https://issues.apache.org/jira/browse/KAFKA-7660 br, Patrik On Mon, 19 Nov 2018 at 02:03, Guozhang Wang wrote: > Hello Patrik, > > Could you file a JIRA for your findings? Also what Kafka versions are you > using (could you add that to the ticket as well)? > > Could you provide some more elaborations on what you did the JVM analysis, > so that I can try to re-produce the observations. > > > Guozhang > > On Thu, Oct 25, 2018 at 2:50 AM Patrik Kleindl wrote: > > > Hello > > > > During the analysis of JVM memory two possible issues were shown which I > > would like to bring to your attention: > > 1) Duplicate strings > > Top findings: > > string_content="stream-processor-node-metrics" count="534,277" > > string_content="processor-node-id" count="148,437" > > string_content="stream-rocksdb-state-metrics" count="41,832" > > string_content="punctuate-latency-avg" count="29,681" > > > > "stream-processor-node-metrics" seems to be used in Sensors.java as a > > literal and not interned. > > > > 2) The HashMap parentSensors > > from > > > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > > was reported multiple times as suspicious for potentially keeping alive a > > lot of objects. In our case the reported size was 40-50MB each. > > I haven't looked too deep in the code but noticed that the class > > Sensor.java which is used as a key in the HashMap does not implement > equals > > or hashCode method. Not sure this is a problem though. > > > > Maybe someone can shed some light on this > > > > best regards > > > > Patrik > > > > > -- > -- Guozhang >
Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table
Hi again, Patrik, You'll probably be interested in this recent Jira: https://issues.apache.org/jira/browse/KAFKA-7658 You have a good point about the overhead of going through an intermediate topic... I can see how explicit topic management is an operational burden, and you're also right that the changelog topic only gets read on state restoration. That was an oversight on my part. I think that with KAFKA-7658 and https://github.com/apache/kafka/pull/5779, you'll have two good options in the future. To solve your problem *right now*, you can circumvent the null filtering by wrapping the values of your stream. For example, immediately before the reduce, you could mapValues and wrap the values with Optional. Then, your reduce function can unwrap the Optional and return null if it's empty. Does that make sense? This comes with an important caveat, though, which is part of the motivation for this roadblock to begin with: if your incoming data gets repartitioned in your topology, then the order of records for the key is not deterministic. This would break the semantics of your reduce-to-latest function, and, indeed, any non-commutative reduce function. For example, if you have a topic like: dummykey1: {realkey: A, value: 4} dummykey2: {realkey: A, value: 5} and you do a groupBy( select realkey ) and then reduce( keep latest value) Then, if dummykey1 and dummykey2 are in different partitions, the result would be either A:4 or A:5, depending on which input partition processed first. We have discussed several times solutions to resolve this issue, but it's quite complex in the details. Nevertheless, if you're careful and ensure that you don't have multiple threads producing the same key into the input topic, and also that you don't have a repartition in the middle, then this should work for you. Hope this helps! -john On Sun, Nov 18, 2018 at 7:04 PM Guozhang Wang wrote: > Hi Patrik, > > Thanks for explaining your use case to us. While we can still discuss how > KStream should interpret null-values in aggregations, one workaround atm: > if you deduplication logic can be written as a transformValues operation, > you can do the following: > > > builder.table("source-topic").transformValues(... > Materialized.as("store-name")) > > Note that in a recent PR that we are merging, the source KTable from > builder.table() would not be materialized if users do not specify a > materialized store name, only the value-transformed KTable will be > materialized: > > https://github.com/apache/kafka/pull/5779 > > > Would that work for you? > > Guozhang > > > On Mon, Oct 29, 2018 at 2:08 AM Patrik Kleindl wrote: > > > Hi John and Matthias > > thanks for the questions, maybe explaining our use case helps a bit: > > We are receiving CDC records (row-level insert/update/delete) in one > topic > > per table. The key is derived from the DB records, the value is null in > > case of deletes. Those would be the immutable facts I guess. > > These topics are first streamed through a deduplication Transformer to > drop > > changes on irrelevant fields. > > The results are translated to KTables and joined to each other to > represent > > the same result as the SQLs on the database, but faster. At this stage > the > > delete/null records matter because if a record gets deleted then we want > it > > to drop out of the join too. -> Our reduce-approach produced unexpected > > results here. > > We took the deduplication step separately because in some cases we only > > need the the KStream for processing. > > If you see a simpler/cleaner approach here I'm open to suggestions, of > > course. > > > > Regarding the overhead: > > 1) Named topics create management/maintenance overhead because they have > to > > be created/treated separately (auto-create is not an option) and be > > considered in future changes, topology changes/resets and so on. The > > internal topic removes most of those issues. > > 2) One of our developers came up with the question if the traffic to/from > > the broker was actually the same in both scenarios, we expect that the > same > > is written to the broker for the named topic as well as the reduce-case, > > but if the KTable is maintained inside a streams topology, does it have > to > > read back everything it sends to the broker or can it keep the table > > internally? I hope it is understandable what I mean, otherwise I can try > > the explain it more clearly. > > > > best regards > > > > Patrik > > > > > > On Sat, 27 Oct 2018 at 23:50, John Roesler wrote: > > > > > Hi again Patrik, > > > > > > Actually, this is a good question... Can you share some context about > why > > > you need to convert a stream to a table (including nulls as > retractions)? > > > > > > Thanks, > > > -John > > > > > > On Fri, Oct 26, 2018 at 5:36 PM Matthias J. Sax > > > > wrote: > > > > > > > I don't know your overall application setup. However, a KStream > > > > semantically models immutable facts and there is not update semantic. >
[RESULTS] [VOTE] Release Kafka version 2.1.0
This vote passes with 10 +1 votes (3 bindings) and no 0 or -1 votes. +1 votes PMC Members: * Guozhang Wang * Jason Gustafson * Dong Lin Committers: * Vahid Hashemian * ManiKumar Reddy Community: * Jonathan Santilli * Eno Thereska * Andras Beni * Jakub Scholz * Satish Duggana 0 votes * No votes -1 votes * No votes Vote thread:https://markmail.org/message/qzg3xhduj3otodkr I will continue with the release process and send announcement email. Cheers, Dong
Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table
Hi John and Guozhang Thanks to both of you. I will check with our developers if they want to adopt your suggestions. Using the same ValueTransformer for deduplication on both streams and tables might simplify things. We have eased the operational burden a bit by improving our topic provisioning so we can also hold out a bit. KAFKA-7658 sounds great and made me chuckle because I just asked for this, now I see that there were some discussions/emotions regarding this a lot earlier ;-) Best regards Patrik > Am 20.11.2018 um 18:19 schrieb John Roesler : > > Hi again, Patrik, > > You'll probably be interested in this recent Jira: > https://issues.apache.org/jira/browse/KAFKA-7658 > > You have a good point about the overhead of going through an intermediate > topic... I can see how explicit topic management is an operational burden, > and you're also right that the changelog topic only gets read on state > restoration. That was an oversight on my part. > > I think that with KAFKA-7658 and https://github.com/apache/kafka/pull/5779, > you'll have two good options in the future. > > To solve your problem *right now*, you can circumvent the null filtering by > wrapping the values of your stream. For example, immediately before the > reduce, you could mapValues and wrap the values with Optional. Then, your > reduce function can unwrap the Optional and return null if it's empty. Does > that make sense? > > This comes with an important caveat, though, which is part of the > motivation for this roadblock to begin with: > if your incoming data gets repartitioned in your topology, then the order > of records for the key is not deterministic. This would break the semantics > of your reduce-to-latest function, and, indeed, any non-commutative reduce > function. > > For example, if you have a topic like: > dummykey1: {realkey: A, value: 4} > dummykey2: {realkey: A, value: 5} > > and you do a groupBy( select realkey ) > and then reduce( keep latest value) > > Then, if dummykey1 and dummykey2 are in different partitions, the result > would be either A:4 or A:5, depending on which input partition processed > first. > > We have discussed several times solutions to resolve this issue, but it's > quite complex in the details. > > Nevertheless, if you're careful and ensure that you don't have multiple > threads producing the same key into the input topic, and also that you > don't have a repartition in the middle, then this should work for you. > > Hope this helps! > -john > >> On Sun, Nov 18, 2018 at 7:04 PM Guozhang Wang wrote: >> >> Hi Patrik, >> >> Thanks for explaining your use case to us. While we can still discuss how >> KStream should interpret null-values in aggregations, one workaround atm: >> if you deduplication logic can be written as a transformValues operation, >> you can do the following: >> >> >> builder.table("source-topic").transformValues(... >> Materialized.as("store-name")) >> >> Note that in a recent PR that we are merging, the source KTable from >> builder.table() would not be materialized if users do not specify a >> materialized store name, only the value-transformed KTable will be >> materialized: >> >> https://github.com/apache/kafka/pull/5779 >> >> >> Would that work for you? >> >> Guozhang >> >> >>> On Mon, Oct 29, 2018 at 2:08 AM Patrik Kleindl wrote: >>> >>> Hi John and Matthias >>> thanks for the questions, maybe explaining our use case helps a bit: >>> We are receiving CDC records (row-level insert/update/delete) in one >> topic >>> per table. The key is derived from the DB records, the value is null in >>> case of deletes. Those would be the immutable facts I guess. >>> These topics are first streamed through a deduplication Transformer to >> drop >>> changes on irrelevant fields. >>> The results are translated to KTables and joined to each other to >> represent >>> the same result as the SQLs on the database, but faster. At this stage >> the >>> delete/null records matter because if a record gets deleted then we want >> it >>> to drop out of the join too. -> Our reduce-approach produced unexpected >>> results here. >>> We took the deduplication step separately because in some cases we only >>> need the the KStream for processing. >>> If you see a simpler/cleaner approach here I'm open to suggestions, of >>> course. >>> >>> Regarding the overhead: >>> 1) Named topics create management/maintenance overhead because they have >> to >>> be created/treated separately (auto-create is not an option) and be >>> considered in future changes, topology changes/resets and so on. The >>> internal topic removes most of those issues. >>> 2) One of our developers came up with the question if the traffic to/from >>> the broker was actually the same in both scenarios, we expect that the >> same >>> is written to the broker for the named topic as well as the reduce-case, >>> but if the KTable is maintained inside a streams topology, does it have >> to >>> read back everything it sends