> I wanted to set *retention bytes* and change *cleanup policy* to *delete*
> to prevent the storage being full.  I set following configs in kafka
> streams code:

Are you sure that you use Kafka Streams correctly? Seems like a miss
configuration to switch from compaction to deletion policy. Also note,
if you have an unlimited number of unique keys, you local store would
still grow unbounded. There is not TTL for RocksDB with in Kafka Streams.

I would recommend to revisit your design patterns. You might want to use
a windowed KTable to expire old data.


-Matthias


On 9/2/18 2:31 AM, Amir masud zarebidaki wrote:
> Thanks Guozhang 🙏
> upgrading kafka stream version to 1.1.0 fixed the issue 👍
> 
> On Sat, Sep 1, 2018 at 10:39 PM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hello Amir,
>>
>> The issue you described seems like an old bug that is resolved since 1.1.0
>> (as part of the fix in https://jira.apache.org/jira/browse/KAFKA-6150).
>> Could you try out that version?
>>
>> You do not need to upgrade broker in order to use newer Streams library
>> versions.
>>
>> Guozhang
>>
>> On Sat, Sep 1, 2018 at 1:14 AM, Amir masud zarebidaki <
>> zare.ma...@gmail.com>
>> wrote:
>>
>>> Hi Guys!
>>>
>>> I use kafka streams reduce function and it creates some state store
>> change
>>> log kafka internal topic ( like
>>> app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog ).
>>>
>>> I wanted to set *retention bytes* and change *cleanup policy* to *delete*
>>> to prevent the storage being full.  I set following configs in kafka
>>> streams code:
>>>
>>> Properties props = new Properties();
>>>
>>> props.put(StreamsConfig.TOPIC_PREFIX +
>>> TopicConfig.RETENTION_BYTES_CONFIG,
>>> Constants.INTERNAL_TOPICS_RETENTION_BYTES);
>>> props.put(StreamsConfig.TOPIC_PREFIX +
>>> TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
>>>
>>> KafkaStreams streams = new KafkaStreams(builder.build(), props);
>>>
>>> However, when a new topic is generated only the *retention* config is
>>> applied to the newly generated internal topic and *cleanup policy*
>> remains
>>> *compact*.
>>>
>>> Is there any missing step to do so ?  ( or Isn't it possible to set
>>> internal topics cleanup policy to delete ?)
>>>
>>> I use kafka version 1.0.0 and kafka-streams version 1.0.0
>>>
>>> Thanks in advance 🙏
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to