Topic limits can't override broker limits.
On Tue, Dec 1, 2020 at 6:42 PM Sakke Ferraris wrote:
> Hi all!
>
> I have questions regarding configuration for large messages. I understand
> that kafka has settings for broker and topic for message max sizes;
> message.max.bytes (broker config) and ma
Team,
We are having a use-case, where a User in an Organisation can send data at
different rates for the event - sending mail.These events are published to
kafka and are consumed at our application side for processing.We need to
maintain the order for the events produced by a user in an org.
In o
Adding few application related configurations which can affect producer
rate,
- linger.ms
- batch.size
- buffer.memory
- acks
- compression
- num.io.threads
- num.network.threads
On Mon, Nov 30, 2020 at 3:07 PM girija arumugam
wrote:
> Team,
> *Use-case :*
> *IMAP* . I
I need to read from a topic in one bootstrap server & write it to another
topic in another bootstrap server. Since there's only one
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG property, I am wondering how to
accomplish this?
Do I need to create 2 different KafkaStreams objects? One for reading & the
ot
KafkaStreams can only connect to a single cluster.
If you really need to read from one cluster and write to another, you
have 3 main options:
- use KafkaStreams on the source cluster and mirror the output topic
from the source to the target cluster
- mirror the input topic from the source clus
Hmm.. Ok. Maybe this could be a feature request for a future release. This
can be accomplished easily in Spark Structured Streaming... just saying :)
In Spark Structured Streaming we can have separate configurations for
'readStream' & 'writeStream'. I am a bit surprised this is not available in
Ka
I am grouping messages as follows:
.groupBy((_, myAppObject) => myAppObject.getId)(Grouped.`with`[String,
MyAppObject])
I am getting a message saying.. "No implicits found for parameter
valueSerde: Serde[MyAppObject]
My understanding is I need to add an implicit like this...
implicit val *my
How about a lightweight MirrorMaker instance for just this topic?
Malcolm McFarland
Cavulus
This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any
unauthorized or improper disclosure, copying, distribution, or use of the
contents of this message is prohibited. The information contain
Hello Nitay,
Sorry for the late reply.
Would you be able to share the actual log entries from where you infers the
elapsed time and the total number of records restored?
Guozhang
On Tue, Nov 24, 2020 at 3:44 AM Nitay Kufert wrote:
> Hey,
> I get the log *after* the restart was triggered for
Don't worry about this one. Figured it out. Created a Serde for MyAppObject
& implemented my own Serializer & Deserializer classes.
object MyAppObjectSerde extends Serde[MyAppObject] {
override def serializer(): Serializer[MyAppObject] = new MyAppObjectSerializer
override def deserializer():
I've following code in my Kafka Streams application:
*.groupBy((_, myobj) => myobj.getId)(Grouped.`with`[String, Myobj])*
*.windowedBy(SessionWindows.`with`(Duration.ofMillis(10 * 60 * 1000)))*
*.count()*
*.toStream*
*.map((k,v) => (k.key(), v))*
*.to("kafka-streams-test")*
Expectation: If
11 matches
Mail list logo