Guozhang. I was just looking at the source for this, and it looks like the
RecordCollector.Supplier is part of the internal ProcessorContextImpl
class.  I don't think that's exposed to me, is it?

If I create a processor class that extends AbstractProcess, it only has
access to the ProcessorContext interface, which doesn't expose the
Supplier.

On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote:

> What if we were to use kafka connect instead of streams? Does it have the
> ability to specify partitions, rf, segment size etc?
>
> On 5 October 2016 at 09:42, Gary Ogden <gog...@gmail.com> wrote:
>
>> Thanks Guozhang.
>>
>> So there's no way we could also use InternalTopicManager to specify the
>> number of partitions and RF?
>>
>> https://github.com/apache/kafka/blob/0.10.1/streams/src/main
>> /java/org/apache/kafka/streams/processor/internals/InternalT
>> opicManager.java
>>
>> On 4 October 2016 at 19:34, Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Hello Gary,
>>>
>>> This is also doable in the Processor API, you can use the record
>>> collector
>>> from ProcessorContext to send data to arbitrary topics, i.e.:
>>>
>>> RecordCollector collector = ((RecordCollector.Supplier)
>>> context).recordCollector();
>>> collector.send(new ProducerRecord<>(topic, *...*), keySerializer,
>>> valSerializer, partitioner);
>>>
>>>
>>> But note that if the new topic, e.g. "123456_lv2" is not created, then
>>> the send call will thrown an exception unless the borker-side config
>>> "auto.topic.create.enabled" is set to true; and even in this case, the
>>> topic will be auto-created with the pre-defined number of partitions,
>>> i.e. you cannot control how the topics can be created with what
>>> configs such as compaction policy, num.partitions, segment sizes, etc.
>>> If that works for you then I think it should be fine.
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Tue, Oct 4, 2016 at 12:51 PM, Gary Ogden <gog...@gmail.com> wrote:
>>>
>>> > Is it possible, in a kafka streaming job, to write to another topic
>>> based
>>> > on the key in the messages?
>>> >
>>> > For example, say the message is:
>>> >
>>> > 123456#{"id":56789,"type":1}
>>> >
>>> > where the key is 123456, # is the delimeter, and the {} is the json
>>> data.
>>> >
>>> > And I want to push the json data to another topic that will have the
>>> name
>>> > 123456_lv2.
>>> >
>>> > Is this possible with kafka streaming?
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>

Reply via email to