Re: Large messages max.message.bytes > message.max.bytes

2020-12-01 Thread Liam Clarke-Hutchinson
Topic limits can't override broker limits. On Tue, Dec 1, 2020 at 6:42 PM Sakke Ferraris wrote: > Hi all! > > I have questions regarding configuration for large messages. I understand > that kafka has settings for broker and topic for message max sizes; > message.max.bytes (broker config) and ma

Regarding Fairness in choosing topic and partition for an event

2020-12-01 Thread girija arumugam
Team, We are having a use-case, where a User in an Organisation can send data at different rates for the event - sending mail.These events are published to kafka and are consumed at our application side for processing.We need to maintain the order for the events produced by a user in an org. In o

Re: Regarding framing producer rate in-terms of software as well as hardware configurations

2020-12-01 Thread girija arumugam
Adding few application related configurations which can affect producer rate, - linger.ms - batch.size - buffer.memory - acks - compression - num.io.threads - num.network.threads On Mon, Nov 30, 2020 at 3:07 PM girija arumugam wrote: > Team, > *Use-case :* > *IMAP* . I

Kafka Streams: Read from one Bootstrap server & write to other

2020-12-01 Thread Eric Beabes
I need to read from a topic in one bootstrap server & write it to another topic in another bootstrap server. Since there's only one StreamsConfig.BOOTSTRAP_SERVERS_CONFIG property, I am wondering how to accomplish this? Do I need to create 2 different KafkaStreams objects? One for reading & the ot

Re: Kafka Streams: Read from one Bootstrap server & write to other

2020-12-01 Thread Matthias J. Sax
KafkaStreams can only connect to a single cluster. If you really need to read from one cluster and write to another, you have 3 main options: - use KafkaStreams on the source cluster and mirror the output topic from the source to the target cluster - mirror the input topic from the source clus

Re: Kafka Streams: Read from one Bootstrap server & write to other

2020-12-01 Thread Eric Beabes
Hmm.. Ok. Maybe this could be a feature request for a future release. This can be accomplished easily in Spark Structured Streaming... just saying :) In Spark Structured Streaming we can have separate configurations for 'readStream' & 'writeStream'. I am a bit surprised this is not available in Ka

Kafka Streams: Creating Serde in Scala for application specific object

2020-12-01 Thread Eric Beabes
I am grouping messages as follows: .groupBy((_, myAppObject) => myAppObject.getId)(Grouped.`with`[String, MyAppObject]) I am getting a message saying.. "No implicits found for parameter valueSerde: Serde[MyAppObject] My understanding is I need to add an implicit like this... implicit val *my

Re: Kafka Streams: Read from one Bootstrap server & write to other

2020-12-01 Thread Malcolm McFarland
How about a lightweight MirrorMaker instance for just this topic? Malcolm McFarland Cavulus This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any unauthorized or improper disclosure, copying, distribution, or use of the contents of this message is prohibited. The information contain

Re: GlobalKTable restoration - Unexplained performance penalty

2020-12-01 Thread Guozhang Wang
Hello Nitay, Sorry for the late reply. Would you be able to share the actual log entries from where you infers the elapsed time and the total number of records restored? Guozhang On Tue, Nov 24, 2020 at 3:44 AM Nitay Kufert wrote: > Hey, > I get the log *after* the restart was triggered for

Re: Kafka Streams: Creating Serde in Scala for application specific object

2020-12-01 Thread Eric Beabes
Don't worry about this one. Figured it out. Created a Serde for MyAppObject & implemented my own Serializer & Deserializer classes. object MyAppObjectSerde extends Serde[MyAppObject] { override def serializer(): Serializer[MyAppObject] = new MyAppObjectSerializer override def deserializer():

Kafka Streams: SessionWindows times out immediately and writes out a Null record

2020-12-01 Thread Eric Beabes
I've following code in my Kafka Streams application: *.groupBy((_, myobj) => myobj.getId)(Grouped.`with`[String, Myobj])* *.windowedBy(SessionWindows.`with`(Duration.ofMillis(10 * 60 * 1000)))* *.count()* *.toStream* *.map((k,v) => (k.key(), v))* *.to("kafka-streams-test")* Expectation: If