Hi Sachin
We are using a small helper method to keep this readable:
private Materialized
materializedWith(String name, Serde keySerde, Serde valueSerde)
{
Materialized materialized = Materialized.as(name);
return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
}
So the M
Hi
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
might
be worth a look.
best regards
Patrik
On Fri, 6 Dec 2019 at 06:44, Sachin Mittal wrote:
> I was thinking more of a builder api at DSL level.
> Something like this:
> StreamsBuilder.joineBuilder()
>
Congratulations John!
Well deserved and thanks for all your help
Best regards
Patrik
> Am 13.11.2019 um 06:10 schrieb Kamal Chandraprakash
> :
>
> Congrats John!
>
>> On Wed, Nov 13, 2019 at 7:57 AM Dong Lin wrote:
>>
>> Congratulations John!
>>
>>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang
Hi
Our requirement is related, we want our streams application to only process
messages from the last x weeks.
On new deployments this requires starting the application first, stopping
the application and then resetting the offsets.
I have created https://issues.apache.org/jira/browse/KAFKA-8766 w
Hello Ashok
Adding to what Sophie wrote, if you use a custom RocksDBConfigSetter then
override the BlockBasedTableConfig like following and call
options.setTableFormatConfig(tableConfig)
at the end.
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
options.tableFormatConfig();
tableConf
Hi
Regarding the I/O, RocksDB has something called write amplification which
writes the data to multiple levels internally to enable better optimization at
the cost of storage and I/O.
This is also the reason the stores can get larger than the topics themselves.
This can be modified by RocksDB se
just be
>>>>>> (#global_state_stores +
>>>>>> sum(#partitions_of_topic_per_local_state_store)) . The number of
>>>>>> stream threads isn't relevant here.
>>>>>>
>>>>>> You can also figure it ou
store you might not have closed.
br, Patrik
On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:
>
>
> On 2019/06/27 09:02:39, Patrik Kleindl wrote:
> > Hello Kiran
> >
> > First, the value for maxOpenFiles is per RocksDB in
helps
best regards
Patrik
On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:
>
>
> On 2019/06/26 21:58:02, Patrik Kleindl wrote:
> > Hi Kiran
> > You can use the RocksDBConfigSetter and pass
> >
> > options.setMaxOpen
Hi Kiran
You can use the RocksDBConfigSetter and pass
options.setMaxOpenFiles(100);
to all RocksDBs for the Streams application which limits how many are
kept open at the same time.
best regards
Patrik
On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com <
emailtokir...@gmail.com> wrote:
>
Hi Claudia
Just a sidenote, there is a combined policy for "compact, delete" which
deletes messages older than retention.ms and compacts newer ones if I
remember correctly.
It's still not really in the docs as it seems
https://kafka.apache.org/documentation/#topicconfigs
best regards
Patrik
On Mon
Hi Guozhang
Just a small question, why can't this be checked when trying to instantiate
KafkaStreams?
The Topology should know all topics and the existence of the topics could
be verified with the AdminClient.
This would allow to fail fast similar to when the state directory is not
available.
Or am
Hi Claudia
Probably https://issues.apache.org/jira/browse/KAFKA-5998, welcome to the
club ;-)
best regards
Patrik
On Wed, 20 Mar 2019 at 10:25, Claudia Wegmann wrote:
> Hi kafka users,
>
> since upgrading to kafka 2.1.1 version I get the following log message at
> every startup of streaming serv
Hi Eno
Thanks too, this is indeed helpful
Best regards
Patrik
> Am 18.03.2019 um 18:16 schrieb Eno Thereska :
>
> Hi folks,
>
> The team here has come up with a couple of clarifying tips for
> operationalizing Zookeeper for Kafka that we found missing from the
> official documentation, and pas
biggest
> > impact in reducing global state restoration times for the scenario where
> > the keyspace of the global state store is very large.
> >
> > Taylor
> >
> >
> > On Thu, Feb 21, 2019 at 6:31 AM Patrik Kleindl
> wrote:
> >
&g
Hi Matthias
Minor issue, if locale is not english (german in my case) then
org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED
org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB>
at org.junit.Assert.assertEquals(Assert.java:115)
at org.junit.Assert.a
records to possibly exceed it (default is 50Mb).
>
>
> Guozhang
>
>
>
> On Fri, Feb 8, 2019 at 12:43 AM Patrik Kleindl wrote:
>
> > Hi Taylor
> > You are right, the parallel processing is not mentioned in this issue, if
> > I remember correctly it was in the
Hi
How many partitions do your topics have?
As far as I understand there is a RocksDB for every partition of every KTable
and this can add up quickly.
Depending on how many instances you are using one of them might have to handle
the complete load temporarily which will use more memory.
Also, Roc
large
> amounts of data in global stores and whether there are any inherent
> limitations to the size of global stores.
>
> Our topic is already using compaction.
>
> Taylor
>
>> On Wed, Feb 6, 2019 at 2:41 AM Patrik Kleindl wrote:
>>
>> Hi Taylor
>&
Hi Taylor
We are facing the same issue, although on a smaller scale.
The main problem as you found is that the restoration is running
sequentially, this should be addressed in
https://issues.apache.org/jira/browse/KAFKA-7380, although there has been
no progress lately.
On the other hand you could
Hi
That is because the global tables are handled separately by the
GlobalStreamThread as far as I understand.
You also don‘t see their offsets like for regular consumers.
Best regards
Patrik
> Am 19.01.2019 um 18:19 schrieb Dmitry Minkovsky :
>
> When I add a GlobalKTable for topic
> "message-wr
Hi everyone,
we are planning to add some user activity tracking to an application and I
wanted to ask around for your general experiences and best practices.
Do you use one topic per application or more granular?
Do you write directly from the application to Kafka for tracking purposes?
How to be
e KTable from
>> builder.table() would not be materialized if users do not specify a
>> materialized store name, only the value-transformed KTable will be
>> materialized:
>>
>> https://github.com/apache/kafka/pull/5779
>>
>>
>> Would that work for y
provide some more elaborations on what you did the JVM analysis,
> so that I can try to re-produce the observations.
>
>
> Guozhang
>
> On Thu, Oct 25, 2018 at 2:50 AM Patrik Kleindl wrote:
>
> > Hello
> >
> > During the analysis of JVM memory two possible
uerying, the GlobalKTable will be fully populated
> from the topic. For the KTable case, you can query from the very
> beginning on, while data is put into the table.
>
> Also, for this approach, if you add other processing, this processing
> would not be parallelized but duplicat
Hi Chris
We are using them like you described.
Performance is very good compared to the database used before.
Beware that until https://issues.apache.org/jira/browse/KAFKA-7380
is done the startup will be blocked until all global stores are restored
(sequentially).
This can take a little for larg
nt (instead of subscription) and this consumer does not commit
> any offset to Kafka.
>
> Note that global stores are bootstrapped before processing begins
> though, and are expected to be low throughput topic anyway.
>
>
> -Matthias
>
> On 11/6/18 2:03 AM, Patrik Klein
Hello
Am I doing something wrong or is it by design that global state stores and
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't get
any output for the group:
./kafka-consumer-groups --bootstrap-server broker:9092 --
Hi Andrew
Did you take a look at
https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java
?
We are using this for a case like you described.
Growth should be limited with this approach.
Best reg
Am 29.10.2018 um 18:20 schrieb Pavel Koroliov :
>
> Hi
> No, my application id doesn't change
>
> пн, 29 окт. 2018 г. в 19:11, Patrik Kleindl :
>
>> Hi
>> Does your applicationId change?
>> Best regards
>> Patrik
>>
>>> Am 29.10.2018 um
Hi
Does your applicationId change?
Best regards
Patrik
> Am 29.10.2018 um 13:28 schrieb Pavel Koroliov :
>
> Hi everyone! I use kafka-streams, and i have a problem when i use
> windowedBy. Everything works well until I restart the application. After
> restarting my aggregation starts from beginn
c. For the "reduce" version,
> it's
> > > an internal changelog topic, and for the "topic-to-table" version, the
> > > store can use the intermediate topic as its changelog.
> > >
> > > This doesn't address your ergonomic co
ur KStream to trigger reduce() to
> delete, you will need to use a surrogate value for this, ie, do a
> mapValues() before the groupByKey() call, an replace `null` values with
> the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
> to return `null` for this case.
&g
Hello
Recently we noticed a lot of warning messages in the logs which pointed to
this method (we are running 2.0):
KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
Hello
During the analysis of JVM memory two possible issues were shown which I
would like to bring to your attention:
1) Duplicate strings
Top findings:
string_content="stream-processor-node-metrics" count="534,277"
string_content="processor-node-id" count="148,437"
string_content="stream-rocksdb-
Hello
Can someone please verify if my assumption is correct?
In CachingKeyValueStore, if an exception happens during flush() the store
will not be closed properly.
@Override
public void flush() {
lock.writeLock().lock();
try {
cache.flush(cacheName);
underlying.flush();
can do, because Streams hard codes to set
> the policy to "none". Thus, a manual restart (that is gladly working as
> you confirmed) it currently the way to go.
>
> Thanks for reporting this issue.
>
>
> -Matthias
>
>> On 10/4/18 3:23 AM, Patrik Kleind
you
> globalConsumer is lagging behind? Can you verify this? If yes, it seems
> to make sense to stop processing to inform the user about this issue.
> Would you rather prefer the application to just move on implying silent
> data loss??
>
>
> -Matthias
>
>
> On 10/3/18 1
d to recover from this exception?
>
>
> -Matthias
>
> On 10/2/18 4:54 AM, Patrik Kleindl wrote:
> > Hi
> >
> > We had several incidents where a streams application crashed while
> > maintaining a global state
Hi
We had several incidents where a streams application crashed while
maintaining a global state store.
Updating global state failed. You can restart KafkaStreams to recover from
this error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Offsets out of range with no configured reset
Hello everyone,
we are currently trying to improve the management of our topic
configurations.
At the moment we are managing the configurations on the producer side and
checking/creating/changing topics on each application (instance) startup.
This has worked fine for many cases, but does not real
restored, IN ADDITION, the global stores being restored as well.
>>
>> If you like, please feel free to create a JIRA requesting this improvement
>> so someone can work on it someday.
>>
>> Guozhang
>>
>>
>>
>>
>>> On Sat, Aug 2
Hello
Did you add --execute to the command?
Which command did you use?
Best regards
Patrik
> Am 01.09.2018 um 14:54 schrieb Joseph M'BIMBI-BENE :
>
> Hello everyone,
>
> Hopefully this is the appropriate mailing list for my message.
> When i am trying to reset the offset of some consumer group,
Hello
We are currently using GlobalKTables for interactive queries as well as for
lookups inside stream applications but have come across some
limitations/problems.
The main problem was that our deployments including application start took
longer with every new global state store we added which ca
Hello
Yesterday we had the following exception:
Exception thrown when sending a message with key='null' and payload='...'
to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException:
This message has failed its CRC checksum, exceeds the valid size, or is
otherwise corrupt.
The cau
m someone with knowledge of the code. I
> > seem to recall we tried this for a repartition topic and it didn't do
> quite
> > what we expected.
> >
> > On Fri, Aug 10, 2018 at 3:02 AM Patrik Kleindl
> wrote:
> >
> >> Hello
> >>
> >> In
Hello
In a discussion yesterday the question came up if an internal changelog
topic can be enabled for compaction and deletion.
https://stackoverflow.com/questions/50622369/kafka-streams-is-it-possible-to-have-compact-delete-policy-on-state-stores
and
https://issues.apache.org/jira/browse/KAFKA-4
47 matches
Mail list logo