When I read from stream, I create AKKA actors for processing events. I am not processing them in the stream, as the result I do not have KStream to write into it. So I use KafkaConsumer instead.
On Sat, Jun 11, 2016 at 7:13 AM, Matthias J. Sax <matth...@confluent.io> wrote: > Do you instantiate KafkaProduer in your user code? > > Why no use KStream.to("topic-name") ? > > > -Matthias > > On 06/10/2016 12:28 AM, Saeed Ansari wrote: > > Thank you Eno, > > Adding more threads extremely increased the throughput of stream. As I > said > > after processing I send the event to another topic. For that I was > opening > > a connection via KafkaProducer to the cluster and I think that was the > > issue. Now there is just one producer for sending events to output topic. > > > > Do you have any recommendation how that part can get better? > > > > Thank you so much, > > Saeed > > > > On Thu, Jun 9, 2016 at 3:33 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > >> Hi Saeed, > >> > >> There could be several reasons why things appear slow and it is > difficult > >> to say without knowing the exact details of the setup and the results > you > >> are observing. > >> One thing to check is the number of threads you have assigned to the > Kafka > >> Stream application. By default just one thread is used. Perhaps you want > >> more (depending on number of cores you have). An example way to change > that > >> in your app: > >> > >> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4 /* four threads > */); > >> > >> Thanks > >> Eno > >> > >>> On 9 Jun 2016, at 18:08, Saeed Ansari <saeed.ans...@gmail.com> wrote: > >>> > >>> Hi Eno, > >>> Thank you for the response. Actually I did not know it automatically > >>> assigns partitions to consumers. Now I have one Kafkastream reading > from > >>> 12 partitions, like below: > >>> Controller is an actor that I am sending the message to and then it > >> creates > >>> child actors to send messages out. > >>> > >>> builder.stream("test").foreach((x, y) -> { > >>> controller.tell(y, controller.noSender()); > >>> }); > >>> > >>> > >>> The msg/sec rate I get from receiving messages to sending them out is > >>> really slow! > >>> > >>> Do you think it is about how consume messages? > >>> > >>> Thank you, > >>> Saeed > >>> > >>> > >>> > >>> On Wed, Jun 8, 2016 at 3:08 AM, Eno Thereska <eno.there...@gmail.com> > >> wrote: > >>> > >>>> Hi Saeed, > >>>> > >>>> Kafka Streams takes care of assigning partitions to consumers > >>>> automatically for you. You don't have to write anything explicit to do > >>>> that. See WordCountDemo.java as an example. Was there another reason > you > >>>> wanted control over partition assignment? > >>>> > >>>> Thanks > >>>> Eno > >>>> > >>>>> On 7 Jun 2016, at 20:02, Saeed Ansari <saeed.ans...@gmail.com> > wrote: > >>>>> > >>>>> Hi, > >>>>> I have started a small Kafka streaming application. I need to assign > >>>>> partitions to consumers in a consumer group. I am not sure where to > >> start > >>>>> and how to structure consumer groups in KafkaStreams. > >>>>> > >>>>> I found that there is a StreamPartitionAssignor that can be added to > >>>>> config, but still I do not know how to configure it and what > parameters > >>>> to > >>>>> config. > >>>>> > >>>>> Any help is really appreciated. > >>>>> > >>>>> Thank you, > >>>>> Saeed > >>>> > >>>> > >> > >> > > > >