Usage of cleanup.policy=compact,delete

2018-08-10 Thread Patrik Kleindl
Hello In a discussion yesterday the question came up if an internal changelog topic can be enabled for compaction and deletion. and

Re: Usage of cleanup.policy=compact,delete

2018-08-13 Thread Patrik Kleindl
m someone with knowledge of the code. I > > seem to recall we tried this for a repartition topic and it didn't do > quite > > what we expected. > > > > On Fri, Aug 10, 2018 at 3:02 AM Patrik Kleindl > wrote: > > > >> Hello > >> > >> In

Improve error message when trying to produce message without key for compacted topic

2018-08-21 Thread Patrik Kleindl
Hello Yesterday we had the following exception: Exception thrown when sending a message with key='null' and payload='...' to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. The cau

GlobalKTable/KTable initialization differences

2018-08-25 Thread Patrik Kleindl
Hello We are currently using GlobalKTables for interactive queries as well as for lookups inside stream applications but have come across some limitations/problems. The main problem was that our deployments including application start took longer with every new global state store we added which ca

Re: resetting consumer group offset to earliest and to-latest not working

2018-09-01 Thread Patrik Kleindl
Hello Did you add --execute to the command? Which command did you use? Best regards Patrik > Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE : > > Hello everyone, > > Hopefully this is the appropriate mailing list for my message. > When i am trying to reset the offset of some consumer group,

Re: GlobalKTable/KTable initialization differences

2018-09-06 Thread Patrik Kleindl
restored, IN ADDITION, the global stores being restored as well. >> >> If you like, please feel free to create a JIRA requesting this improvement >> so someone can work on it someday. >> >> Guozhang >> >> >> >> >>> On Sat, Aug 2

Managing/Versioning Topic Configurations for CI/CD

2018-09-27 Thread Patrik Kleindl
Hello everyone, we are currently trying to improve the management of our topic configurations. At the moment we are managing the configurations on the producer side and checking/creating/changing topics on each application (instance) startup. This has worked fine for many cases, but does not real

Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-02 Thread Patrik Kleindl
Hi We had several incidents where a streams application crashed while maintaining a global state store. Updating global state failed. You can restart KafkaStreams to recover from this error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-03 Thread Patrik Kleindl
d to recover from this exception? > > > -Matthias > > On 10/2/18 4:54 AM, Patrik Kleindl wrote: > > Hi > > > > We had several incidents where a streams application crashed while > > maintaining a global state

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-04 Thread Patrik Kleindl
you > globalConsumer is lagging behind? Can you verify this? If yes, it seems > to make sense to stop processing to inform the user about this issue. > Would you rather prefer the application to just move on implying silent > data loss?? > > > -Matthias > > > On 10/3/18 1

Re: Global/Restore consumer use auto.offset.reset = none vs. OffsetOutOfRangeException

2018-10-04 Thread Patrik Kleindl
can do, because Streams hard codes to set > the policy to "none". Thus, a manual restart (that is gladly working as > you confirmed) it currently the way to go. > > Thanks for reporting this issue. > > > -Matthias > >> On 10/4/18 3:23 AM, Patrik Kleind

RocksDB not closed on error during CachingKeyValueStore.flush?

2018-10-23 Thread Patrik Kleindl
Hello Can someone please verify if my assumption is correct? In CachingKeyValueStore, if an exception happens during flush() the store will not be closed properly. @Override public void flush() { lock.writeLock().lock(); try { cache.flush(cacheName); underlying.flush();

Stream Metrics - Memory Analysis

2018-10-25 Thread Patrik Kleindl
Hello During the analysis of JVM memory two possible issues were shown which I would like to bring to your attention: 1) Duplicate strings Top findings: string_content="stream-processor-node-metrics" count="534,277" string_content="processor-node-id" count="148,437" string_content="stream-rocksdb-

Converting a Stream to a Table - groupBy/reduce vs.

2018-10-25 Thread Patrik Kleindl
Hello Recently we noticed a lot of warning messages in the logs which pointed to this method (we are running 2.0): KStreamReduce public void process(final K key, final V value) { // If the key or value is null we don't need to proceed if (key == null || value == null) {

Re: Converting a Stream to a Table - groupBy/reduce vs.

2018-10-26 Thread Patrik Kleindl
ur KStream to trigger reduce() to > delete, you will need to use a surrogate value for this, ie, do a > mapValues() before the groupByKey() call, an replace `null` values with > the surrogate-delete-marker that you can evaluate in `Reducer#apply()` > to return `null` for this case. &g

Re: Converting a Stream to a Table - groupBy/reduce vs.

2018-10-29 Thread Patrik Kleindl
c. For the "reduce" version, > it's > > > an internal changelog topic, and for the "topic-to-table" version, the > > > store can use the intermediate topic as its changelog. > > > > > > This doesn't address your ergonomic co

Re: Problem with kafka-streams aggregate windowedBy

2018-10-29 Thread Patrik Kleindl
Hi Does your applicationId change? Best regards Patrik > Am 29.10.2018 um 13:28 schrieb Pavel Koroliov : > > Hi everyone! I use kafka-streams, and i have a problem when i use > windowedBy. Everything works well until I restart the application. After > restarting my aggregation starts from beginn

Re: Problem with kafka-streams aggregate windowedBy

2018-10-29 Thread Patrik Kleindl
Am 29.10.2018 um 18:20 schrieb Pavel Koroliov : > > Hi > No, my application id doesn't change > > пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl : > >> Hi >> Does your applicationId change? >> Best regards >> Patrik >> >>> Am 29.10.2018 um

Re: Deduplicating a topic in the face of producer crashes over a time window?

2018-11-01 Thread Patrik Kleindl
Hi Andrew Did you take a look at ? We are using this for a case like you described. Growth should be limited with this approach. Best reg

Offsets/Lags for global state stores not shown

2018-11-06 Thread Patrik Kleindl
Hello Am I doing something wrong or is it by design that global state stores and their consumers do not show up under the consumer-groups? With the consumer group command (and in control center as well) I don't get any output for the group: ./kafka-consumer-groups --bootstrap-server broker:9092 --

Re: Offsets/Lags for global state stores not shown

2018-11-07 Thread Patrik Kleindl
nt (instead of subscription) and this consumer does not commit > any offset to Kafka. > > Note that global stores are bootstrapped before processing begins > though, and are expected to be low throughput topic anyway. > > > -Matthias > > On 11/6/18 2:03 AM, Patrik Klein

Re: Using GlobalKTable/KeyValueStore for topic cache

2018-11-13 Thread Patrik Kleindl
Hi Chris We are using them like you described. Performance is very good compared to the database used before. Beware that until is done the startup will be blocked until all global stores are restored (sequentially). This can take a little for larg

Re: Offsets/Lags for global state stores not shown

2018-11-18 Thread Patrik Kleindl
uerying, the GlobalKTable will be fully populated > from the topic. For the KTable case, you can query from the very > beginning on, while data is put into the table. > > Also, for this approach, if you add other processing, this processing > would not be parallelized but duplicat

Re: Stream Metrics - Memory Analysis

2018-11-20 Thread Patrik Kleindl
provide some more elaborations on what you did the JVM analysis, > so that I can try to re-produce the observations. > > > Guozhang > > On Thu, Oct 25, 2018 at 2:50 AM Patrik Kleindl wrote: > > > Hello > > > > During the analysis of JVM memory two possible

Re: Converting a Stream to a Table - groupBy/reduce vs.

2018-11-20 Thread Patrik Kleindl
e KTable from >> builder.table() would not be materialized if users do not specify a >> materialized store name, only the value-transformed KTable will be >> materialized: >> >> >> >> >> Would that work for y

User Activity Tracking

2019-01-10 Thread Patrik Kleindl
Hi everyone, we are planning to add some user activity tracking to an application and I wanted to ask around for your general experiences and best practices. Do you use one topic per application or more granular? Do you write directly from the application to Kafka for tracking purposes? How to be

Re: Warning when adding GlobalKTable to toplogy

2019-01-19 Thread Patrik Kleindl
Hi That is because the global tables are handled separately by the GlobalStreamThread as far as I understand. You also don‘t see their offsets like for regular consumers. Best regards Patrik > Am 19.01.2019 um 18:19 schrieb Dmitry Minkovsky : > > When I add a GlobalKTable for topic > "message-wr

Re: Minimizing global store restoration time

2019-02-06 Thread Patrik Kleindl
Hi Taylor We are facing the same issue, although on a smaller scale. The main problem as you found is that the restoration is running sequentially, this should be addressed in, although there has been no progress lately. On the other hand you could

Re: Minimizing global store restoration time

2019-02-08 Thread Patrik Kleindl
large > amounts of data in global stores and whether there are any inherent > limitations to the size of global stores. > > Our topic is already using compaction. > > Taylor > >> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl wrote: >> >> Hi Taylor >&

Re: Accessing Kafka stream's KTable underlying RocksDB memory usage

2019-02-17 Thread Patrik Kleindl
Hi How many partitions do your topics have? As far as I understand there is a RocksDB for every partition of every KTable and this can add up quickly. Depending on how many instances you are using one of them might have to handle the complete load temporarily which will use more memory. Also, Roc

Re: Minimizing global store restoration time

2019-02-21 Thread Patrik Kleindl
records to possibly exceed it (default is 50Mb). > > > Guozhang > > > > On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl wrote: > > > Hi Taylor > > You are right, the parallel processing is not mentioned in this issue, if > > I remember correctly it was in the

Re: [VOTE] 2.2.0 RC0

2019-02-25 Thread Patrik Kleindl
Hi Matthias Minor issue, if locale is not english (german in my case) then org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB> at org.junit.Assert.assertEquals( at org.junit.Assert.a

Re: Minimizing global store restoration time

2019-03-01 Thread Patrik Kleindl
biggest > > impact in reducing global state restoration times for the scenario where > > the keyspace of the global state store is very large. > > > > Taylor > > > > > > On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl > wrote: > > &g

Re: Operationalizing Zookeeper and common gotchas

2019-03-18 Thread Patrik Kleindl
Hi Eno Thanks too, this is indeed helpful Best regards Patrik > Am 18.03.2019 um 18:16 schrieb Eno Thereska : > > Hi folks, > > The team here has come up with a couple of clarifying tips for > operationalizing Zookeeper for Kafka that we found missing from the > official documentation, and pas

Re: No checkpoint found

2019-03-20 Thread Patrik Kleindl
Hi Claudia Probably, welcome to the club ;-) best regards Patrik On Wed, 20 Mar 2019 at 10:25, Claudia Wegmann wrote: > Hi kafka users, > > since upgrading to kafka 2.1.1 version I get the following log message at > every startup of streaming serv

Re: KafkaStreams backoff for non-existing topic

2019-03-25 Thread Patrik Kleindl
Hi Guozhang Just a small question, why can't this be checked when trying to instantiate KafkaStreams? The Topology should know all topics and the existence of the topics could be verified with the AdminClient. This would allow to fail fast similar to when the state directory is not available. Or am

Re: Offsets of deleted consumer groups do not get deleted correctly

2019-04-01 Thread Patrik Kleindl
Hi Claudia Just a sidenote, there is a combined policy for "compact, delete" which deletes messages older than and compacts newer ones if I remember correctly. It's still not really in the docs as it seems best regards Patrik On Mon

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-26 Thread Patrik Kleindl
Hi Kiran You can use the RocksDBConfigSetter and pass options.setMaxOpenFiles(100); to all RocksDBs for the Streams application which limits how many are kept open at the same time. best regards Patrik On Wed, 26 Jun 2019 at 16:14, <> wrote: >

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-27 Thread Patrik Kleindl
helps best regards Patrik On Thu, 27 Jun 2019 at 10:46, <> wrote: > > > On 2019/06/26 21:58:02, Patrik Kleindl wrote: > > Hi Kiran > > You can use the RocksDBConfigSetter and pass > > > > options.setMaxOpen

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-06-27 Thread Patrik Kleindl
store you might not have closed. br, Patrik On Thu, 27 Jun 2019 at 13:11, <> wrote: > > > On 2019/06/27 09:02:39, Patrik Kleindl wrote: > > Hello Kiran > > > > First, the value for maxOpenFiles is per RocksDB in

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-03 Thread Patrik Kleindl
just be >>>>>> (#global_state_stores + >>>>>> sum(#partitions_of_topic_per_local_state_store)) . The number of >>>>>> stream threads isn't relevant here. >>>>>> >>>>>> You can also figure it ou

Re: Streams reprocessing whole topic when deployed but not locally

2019-07-10 Thread Patrik Kleindl
Hi Regarding the I/O, RocksDB has something called write amplification which writes the data to multiple levels internally to enable better optimization at the cost of storage and I/O. This is also the reason the stores can get larger than the topics themselves. This can be modified by RocksDB se

Re: Kafka Streams - unbounded memory growth - stateful processing (rocksdb)

2019-07-16 Thread Patrik Kleindl
Hello Ashok Adding to what Sophie wrote, if you use a custom RocksDBConfigSetter then override the BlockBasedTableConfig like following and call options.setTableFormatConfig(tableConfig) at the end. BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); tableConf

Re: How to start a stream from only new records?

2019-08-13 Thread Patrik Kleindl
Hi Our requirement is related, we want our streams application to only process messages from the last x weeks. On new deployments this requires starting the application first, stopping the application and then resetting the offsets. I have created w

Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Patrik Kleindl
Congratulations John! Well deserved and thanks for all your help Best regards Patrik > Am 13.11.2019 um 06:10 schrieb Kamal Chandraprakash > : > > Congrats John! > >> On Wed, Nov 13, 2019 at 7:57 AM Dong Lin wrote: >> >> Congratulations John! >> >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang

Re: Case of joining multiple streams/tables

2019-12-06 Thread Patrik Kleindl
Hi might be worth a look. best regards Patrik On Fri, 6 Dec 2019 at 06:44, Sachin Mittal wrote: > I was thinking more of a builder api at DSL level. > Something like this: > StreamsBuilder.joineBuilder() >

Re: How to set concrete names for state stores and internal topics backed by these

2019-12-06 Thread Patrik Kleindl
Hi Sachin We are using a small helper method to keep this readable: private Materialized materializedWith(String name, Serde keySerde, Serde valueSerde) { Materialized materialized =; return materialized.withKeySerde(keySerde).withValueSerde(valueSerde); } So the M