Re: callback function

2020-01-08 Thread Jonathan Santilli
Hello, when you say "something is new in the queue" you mean a new
message/record is available in a partition within a topic?

Cheers!

On Tue, Jan 7, 2020, 8:46 PM Tavares Forby  wrote:

> Hi,
>
> Is there a method in which a function can wake when something is new in
> the queue? This has to be non-blocking to the main thread.
>
> Thanks,
> -Tavares
>


Re: High CPU Usage on Brokers

2020-01-08 Thread Lisheng Wang
Hi Navneeth

like the bug you said above,  do you set sun.security.jgss.native = true?

if not, there are some items need to be check.

1. GC, but you say gc is not problem
2. if you suspect network thread, how many thread did you set?
3. if you enable compression
4. did you change the value of batch.size at producer side?
5. do you think you can increase "fetch.min,bytes" at consumer side and
"replica.fetch.min.bytes" at broker to test if cpu usage can be down ?
6. you can check some metrics from jmx to analysis, e.g. checking
"kafka.network:type=RequestMetrics,
name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower}", if
valus is high , that means cpu will be busy.

Best,
Lisheng


Navneeth Krishnan  于2020年1月8日周三 下午3:39写道:

> Hi All,
>
> Any suggestions, we are running into this issue in production and any
> help would be greatly appreciated.
>
> Thanks
>
> On Mon, Jan 6, 2020 at 9:26 PM Navneeth Krishnan  >
> wrote:
>
> > Hi,
> >
> > Thanks for the response. We were using version 0.11 previously and all
> our
> > producers/consumers have been upgraded to either 1.0 or to the latest
> 2.3.
> >
> > Is it normal for the network thread to consume more cpu? If you look at
> > it, the network thread consumes 50% of the overall cpu.
> >
> > Regards
> >
> > On Mon, Jan 6, 2020 at 7:04 PM Thunder Stumpges <
> > thunder.stump...@gmail.com> wrote:
> >
> >> Not sure what version your producers/consumers are, or if you upgraded
> >> from
> >> a previous version that used to work, or what, but maybe you're hitting
> >> this?
> >>
> >>
> >>
> https://kafka.apache.org/23/documentation.html#upgrade_10_performance_impact
> >>
> >>
> >>
> >> On Mon, Jan 6, 2020 at 12:48 PM Navneeth Krishnan <
> >> reachnavnee...@gmail.com>
> >> wrote:
> >>
> >> > Hi All,
> >> >
> >> > Any idea on what can be done? Not sure if we are running into this
> below
> >> > bug.
> >> >
> >> > https://issues.apache.org/jira/browse/KAFKA-7925
> >> >
> >> > Thanks
> >> >
> >> > On Thu, Jan 2, 2020 at 4:18 PM Navneeth Krishnan <
> >> reachnavnee...@gmail.com>
> >> > wrote:
> >> >
> >> >> Hi All,
> >> >>
> >> >> We have a kafka cluster with 12 nodes and we are pretty much seeing
> 90%
> >> >> cpu usage on all the nodes. Here is all the information. Need some
> >> help on
> >> >> figuring out what the problem is and how to overcome this issue.
> >> >>
> >> >> *Cluster:*
> >> >> Kafka version: 2.3.0
> >> >> Number of brokers in cluster: 12
> >> >> Node type: 4 vCores 32GB mem
> >> >> Network In: 10Mbps per broker
> >> >> Network Out: 16Mbps per broker
> >> >> Topics: 10 (approximately)
> >> >> Partitions: 20 (Max), some has only partitions
> >> >> Replication Factor: 3
> >> >>
> >> >> *CPU Usage:*
> >> >> [image: image.png]
> >> >>
> >> >> *VMStat*
> >> >>
> >> >> [root]# vmstat 1 10
> >> >>
> >> >> procs ---memory-- ---swap-- -io -system--
> >> >> --cpu-
> >> >>
> >> >>  r  b   swpd   free   buff  cache   si   sobibo   in   cs us
> sy
> >> >> id wa st
> >> >>
> >> >>  8  0  0 23  19064 240469800017  202613
> 38
> >> 33
> >> >> 28  0  1
> >> >>
> >> >>  7  0  0 256444  19036 2402388000   768 0 64027 22708
> >> 44
> >> >> 40 16  0  1
> >> >>
> >> >>  7  0  0 245356  19052 2403456000   256   472 63509 23276
> >> 44
> >> >> 39 17  0  1
> >> >>
> >> >>  7  0  0 235096  19052 2404661600 0 0 62277 22516
> >> 46
> >> >> 38 15  0  1
> >> >>
> >> >>  8  0  0 260548  19036 2402008400   516 49888 62364 22894
> >> 43
> >> >> 38 18  0  1
> >> >>
> >> >>  5  0  0 249232  19036 2403092400   512 0 61022 24589
> >> 41
> >> >> 39 20  0  1
> >> >>
> >> >>  6  0  0 238072  19036 2404251200  1024 0 63358 23063
> >> 44
> >> >> 38 17  0  0
> >> >>
> >> >>  5  0  0 262904  19052 2401797200 0   440 63078 23499
> >> 46
> >> >> 37 17  0  1
> >> >>
> >> >>  7  0  0 250324  19052 2403000800 0 0 64615 22617
> >> 48
> >> >> 38 14  0  1
> >> >>
> >> >>  6  0  0 237920  19052 2404237200  1024 48900 63223 23029
> >> 42
> >> >> 40 18  0  1
> >> >>
> >> >>
> >> >> *IO Stat:*
> >> >>
> >> >> [root]# iostat -m
> >> >>
> >> >> Linux 4.14.72-73.55.amzn2.x86_64 (loc-kafka11.internal.dnaspaces.io)
> >> >> 01/02/2020_x86_64_ (4 CPU)
> >> >>
> >> >>
> >> >>
> >> >> avg-cpu:  %user   %nice %system %iowait  %steal   %idle
> >> >>
> >> >>   38.110.00   33.090.110.61   28.08
> >> >>
> >> >>
> >> >>
> >> >> Device:tpsMB_read/sMB_wrtn/sMB_read
> MB_wrtn
> >> >>
> >> >> xvda  2.36 0.01 0.01  26760
> 43360
> >> >>
> >> >> nvme0n1   0.00 0.00 0.00  2
> 0
> >> >>
> >> >> xvdf 70.95 0.06 7.67 185908
>  25205338
> >> >>
> >> >> *Top Kafka broker threads:*
> >> >> [image: image.png]
> >> >>
> >> >> *Top 3:*
> >> >>
> >> >>
> >> "data-plane

Re: callback function

2020-01-08 Thread Tom Bentley
Tavares, if you're asking about the consumer then I think you might have a
misconception about how it works: The application calls poll() to fetch the
latest records from the broker(s). The broker is not pushing records into
some queue in the consumer. It might be worth reading
https://kafka.apache.org/documentation/#design_pull.

Kind regards,

Tom

On Wed, Jan 8, 2020 at 9:09 AM Jonathan Santilli 
wrote:

> Hello, when you say "something is new in the queue" you mean a new
> message/record is available in a partition within a topic?
>
> Cheers!
>
> On Tue, Jan 7, 2020, 8:46 PM Tavares Forby 
> wrote:
>
> > Hi,
> >
> > Is there a method in which a function can wake when something is new in
> > the queue? This has to be non-blocking to the main thread.
> >
> > Thanks,
> > -Tavares
> >
>


MirrorMaker 2 - Does it write anything to source cluster?

2020-01-08 Thread Péter Sinóros-Szabó
Hi,

so I am planning to use MM2 and was thinking if it has any impact on the
source cluster when mirroring.

Obviously it impacts the performance of the source cluster, so I plan to
use quotas to solve that, but other than that,

Does MM2 write anything back to the source cluster?

As I understand documentation, it won't, but a clarification on this would
be great.

Thanks for creating MM2! :)

best,
Peter


Re: callback function

2020-01-08 Thread M. Manna
Hey Tavares,


On Wed, 8 Jan 2020 at 09:38, Tom Bentley  wrote:

> Tavares, if you're asking about the consumer then I think you might have a
> misconception about how it works: The application calls poll() to fetch the
> latest records from the broker(s). The broker is not pushing records into
> some queue in the consumer. It might be worth reading
> https://kafka.apache.org/documentation/#design_pull.
>
> Kind regards,
>
> Tom
>
> On Wed, Jan 8, 2020 at 9:09 AM Jonathan Santilli <
> jonathansanti...@gmail.com>
> wrote:
>
> > Hello, when you say "something is new in the queue" you mean a new
> > message/record is available in a partition within a topic?
> >
> > Cheers!
> >
> > On Tue, Jan 7, 2020, 8:46 PM Tavares Forby 
> > wrote:
> >
> > > Hi,
> > >
> > > Is there a method in which a function can wake when something is new in
> > > the queue? This has to be non-blocking to the main thread.
> > >
> > > Thanks,
> > > -Tavares
> > >
> >


+1 to what Tom et. al. mentioned.

Perhaps it would be good to know what it is you are trying to solve
exactly. We may be able to help you better if we have then if picture :) ?

Regards,

>
>


Sporadic kafka API timeouts during consumers lag fetching

2020-01-08 Thread Maksym Oryshchuk
Hello, All,

We are observing strange behavior on one of our kafka clusters (version
2.2.0).
Sporadically, kafka admin API returns "*Consumer group 'consumer_name' has
no active members.*"  when we try to fetch consumer offset
via ./kafka-consumer-groups.sh. Issue comes back to normal in a second
after the next try.
Kafka API responds with such kind of error for 5-10% of requests which
creates significant problem for consumer lag monitoring.
Problematic topic has 90 partitions and consumer group has 3 instances of
consumer. Increasing number of consumers made the situation only worse.
Magically, we were able to fix the issue by running partition reassignment
for __consumer_offset topic. But the problem comes back after the next
change in cluster or in number of consumers.
Did somebody observe such problem? Any suggestions?
Thank you!


Re: MM2 startup delay

2020-01-08 Thread Péter Sinóros-Szabó
Hey,

I did some further debugging on this.

I say that the following happened just before the real huge mirroring
traffic starts:

[2020-01-08 10:48:51,926] INFO Starting with 2294 previously uncommitted
partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:94)
[2020-01-08 10:48:51,928] INFO [Consumer clientId=consumer-11,
groupId=null] Seeking to offset 0 for partition ...

So I took a thread dump before that and I see that MirrorSourceConnector is
waiting on line 101 in OffsetStorageReaderImpl. It seems that it wait for
the offset backing store (kafka) to fill in the offsets. So it waits there
for minutes. I could not find out why, I hope someone has a clue what may
happen there.

We have about 2800 partitions.

Cheers,
Peter

On Mon, 9 Dec 2019 at 12:28, Péter Sinóros-Szabó <
peter.sinoros-sz...@transferwise.com> wrote:

> Hi,
>
> I am experimenting with Mirror Make 2 in 2.4.0-rc3. It seems to start up
> fine, connects to both source and destination, creates new topics...
> But it does not start to actually mirror the messages until about 12
> minutes after MM2 was started. I would expect it to start mirroring in some
> seconds after startup.
>
> Source cluster has about 2800 partitions, destination cluster is empty.
> Both clusters are in AWS but in different regions.
>
> What may cause the 12 minutes delay?
>
> Config is:
> ---
> clusters = eucmain, euwbackup
> eucmain.bootstrap.servers =
> test-kafka-main-fra01.xx:9092,test-kafka-main-fra02.xx:9092
> euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092
> eucmain->euwbackup.enabled = true
> eucmain->euwbackup.topics = .*
> eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__).*
> eucmain->euwbackup.rename.topics = false
> replication.policy.separator = __
> eucmain.client.id = mm2
>
> I do not see any serious errors in the logs that I would think of a cause
> of this.
>
> Thanks,
> Peter
>
>

-- 
 - Sini


Re: MirrorMaker 2 - Does it write anything to source cluster?

2020-01-08 Thread Ryanne Dolan
Peter, MM2 writes offset syncs upstream to the source cluster, which are
then used to emit checkpoints to the target cluster. There is no particular
reason why offset syncs are stored on the source cluster instead of the
target, and it's been suggested that we swap that around.

Ryanne

On Wed, Jan 8, 2020, 3:58 AM Péter Sinóros-Szabó
 wrote:

> Hi,
>
> so I am planning to use MM2 and was thinking if it has any impact on the
> source cluster when mirroring.
>
> Obviously it impacts the performance of the source cluster, so I plan to
> use quotas to solve that, but other than that,
>
> Does MM2 write anything back to the source cluster?
>
> As I understand documentation, it won't, but a clarification on this would
> be great.
>
> Thanks for creating MM2! :)
>
> best,
> Peter
>


MirrorMaker 2 throttling

2020-01-08 Thread Péter Sinóros-Szabó
Hi,

I'd like to throttle the mirroring process when I start Mirror Maker 2 at
the first time, so it starts to pull all the messages that exists on the
source cluster. I'd like to do this only to avoid putting too much traffic
on the source cluster that may slow down existing production client on it.

I tried several quota setups on both the source and destination clusters,
both none of them worked.
- it either did not have any affect
- or slowed down the mirroring but also cause issues like
ERROR WorkerSourceTask{id=MirrorHeartbeatConnector-0} Failed to flush,
timed out while waiting for producer to flush outstanding 115 messages

Is there a good practice on how to initialize/bootstrap a MirrorMaker
cluster on an existing Kafka cluster?

Cheers,
Peter


Secured communication between Kafka and Zookeeper

2020-01-08 Thread Péter Nagykátai
Kafka version: 2.3.0
Zookeeper version: 3.5.5

Hi!

I'm trying to keep all communication secure in my test cluster, but somehow
I'm unable to get Kafka->Zookeeper connection using SSL. If I don't open
the "clientPort" next to "secureClientPort" I get:

Zookeeper:

 WARN
>  [epollEventLoopGroup-7-4:NettyServerCnxnFactory$CnxnChannelHandler@138]
> - Exception caught
> io.netty.handler.codec.DecoderException:
> io.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record:


Kafka:

INFO Client will use DIGEST-MD5 as SASL mechanism.
> (org.apache.zookeeper.client.ZooKeeperSaslClient)
> INFO Opening socket connection to server FQDN/XX.XXX.XX.XX:2000. Will
> attempt to SASL-authenticate using Login Context section 'Client'
> (org.apache.zookeeper.ClientCnxn)
> INFO Socket connection established to FQDN/XX.XXX.XX.XX:2000, initiating
> session (org.apache.zookeeper.ClientCnxn)
> INFO Unable to read additional data from server sessionid 0x0, likely
> server has closed socket, closing socket connection and attempting
> reconnect (org.apache.zookeeper.ClientCnxn)


By checking network packets I see that if I use "clientPort" plain text
data is forwarded.

Related server.properties settings:













*inter.broker.listener.name:INTERNALlisteners=INTERNAL://FQDN:,EXTERNAL://FQDN:5556advertised.listeners=INTERNAL://FQDN:,EXTERNAL://FQDN:5556listener.security.protocol.map=INTERNAL:SASL_SSL,EXTERNAL:SASL_SSLadvertised.host.name
=FQDNsecurity.protocol=SSLssl.client.auth=requiredssl.truststore.location=/truststore.jksssl.truststore.password=PASSWORDssl.keystore.location=/kafka.jksssl.keystore.password=PASSWORDsasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAIN*

What am I doing wrong?

Thank you!
Peter


Re: MirrorMaker 2 throttling

2020-01-08 Thread Ryanne Dolan
Peter, have you tried overriding the client ID used by MM2's consumers?
Otherwise, the client IDs are dynamic, which would make it difficult to
throttle using quotas.

Ryanne

On Wed, Jan 8, 2020, 10:12 AM Péter Sinóros-Szabó
 wrote:

> Hi,
>
> I'd like to throttle the mirroring process when I start Mirror Maker 2 at
> the first time, so it starts to pull all the messages that exists on the
> source cluster. I'd like to do this only to avoid putting too much traffic
> on the source cluster that may slow down existing production client on it.
>
> I tried several quota setups on both the source and destination clusters,
> both none of them worked.
> - it either did not have any affect
> - or slowed down the mirroring but also cause issues like
> ERROR WorkerSourceTask{id=MirrorHeartbeatConnector-0} Failed to flush,
> timed out while waiting for producer to flush outstanding 115 messages
>
> Is there a good practice on how to initialize/bootstrap a MirrorMaker
> cluster on an existing Kafka cluster?
>
> Cheers,
> Peter
>


maximum lag between current offset, and last commit

2020-01-08 Thread Clark Sims
I liked this article:
https://blog.newrelic.com/engineering/kafka-consumer-config-auto-commit-data-loss/
I particularly like the illustration in the article:
http://newrelic-wpengine.netdna-ssl.com/wp-content/uploads/offset_explained2.jpg

What is the maximum difference between the current offset in a
partition, and the last offset committed?

When current offset = committed offset + maximum allowed, is this when
Kafka broker starts re-sending messages to the consumer?

Can I configure the maximum allowed offset, to control the re-sending behavior?

Thanks,
Clark


Need Help For Kafka Android Clients

2020-01-08 Thread Israt Jahan

Dear Concerned,​


Hope you are doing well. We wanted to create Apache Kafka Producer and Consumer 
Client for our Android app. But we are getting this error that the Management 
Factory is not present in Android. Is there any alternative way we can use for 
that? If we can, will you suggest any alternative way please.

We have created a repository for our App. Here is the link below. If you can 
give any solutions it will be really helpful for us.


Error:


java.lang.NoClassDefFoundError: Failed resolution of: 
Ljava/lang/management/ManagementFactory;
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:65)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:699)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:333)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:188)
at com.israt.jahan.kafkatestrnd.MainActivity.send(MainActivity.java:50)
at 
com.israt.jahan.kafkatestrnd.MainActivity.access$000(MainActivity.java:18)
at 
com.israt.jahan.kafkatestrnd.MainActivity$1.onClick(MainActivity.java:34)
at android.view.View.performClick(View.java:6612)
at android.view.View.performClickInternal(View.java:6589)
at android.view.View.access$3100(View.java:785)
at android.view.View$PerformClick.run(View.java:25925)
at android.os.Handler.handleCallback(Handler.java:873)
at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:201)
at android.app.ActivityThread.main(ActivityThread.java:6823)
at java.lang.reflect.Method.invoke(Native Method)
at 
com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:547)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:873)
 Caused by: java.lang.ClassNotFoundException: Didn't find class 
"java.lang.management.ManagementFactory" on path: DexPathList[[zip file 
"/data/app/com.israt.jahan.kafkatestrnd-bleC0tZw1AXbDHd8T9OnJQ==/base.apk"],nativeLibraryDirectories=[/data/app/com.israt.jahan.kafkatestrnd-bleC0tZw1AXbDHd8T9OnJQ==/lib/arm64,
 /system/lib64]]
at 
dalvik.system.BaseDexClassLoader.findClass(BaseDexClassLoader.java:171)
at java.lang.ClassLoader.loadClass(ClassLoader.java:379)
at java.lang.ClassLoader.loadClass(ClassLoader.java:312)
at 
org.apache.kafka.common.utils.AppInfoParser.unregisterAppInfo(AppInfoParser.java:65)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:699)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:333)
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:188)
at com.israt.jahan.kafkatestrnd.MainActivity.send(MainActivity.java:50)
at 
com.israt.jahan.kafkatestrnd.MainActivity.access$000(MainActivity.java:18)
at 
com.israt.jahan.kafkatestrnd.MainActivity$1.onClick(MainActivity.java:34)
at android.view.View.performClick(View.java:6612)
at android.view.View.performClickInternal(View.java:6589)
at android.view.View.access$3100(View.java:785)
at android.view.View$PerformClick.run(View.java:25925)
at android.os.Handler.handleCallback(Handler.java:873)
at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:201)
at android.app.ActivityThread.main(ActivityThread.java:6823)
at java.lang.reflect.Method.invoke(Native Method)
at 
com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:547)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:873)


Android Gradle Dependency:

implementation 'org.apache.kafka:kafka-clients:0.10.0.0'
implementation 'org.apache.kafka:kafka-streams:0.10.0.0'


Project Link:


https://github.com/israt-cloudwell/KafkaTestRND​


Reference Links:


https://stackoverflow.com/questions/40427792/kafka-producer-on-android

https://stackoverflow.com/questions/57772783/how-can-we-create-kafka-producer-in-android-application

https://stackoverflow.com/questions/40043532/how-to-use-android-app-as-a-producing-client-for-kafka​

https://stackoverflow.com/questions/19595814/android-add-java-lang-management-api​


Thank You


Regards


Israt Jahan

Cloudwell Limited​

​



Consuming from N topics "co-partitioned"

2020-01-08 Thread Francesco Guardiani
Hi everybody,
I wish to implement a KafkaConsumer that consumes messages from N
co-partitioned topics, in a similar way to KafkaStreams for join semantic.
e.g. for two consumers and two topics with two partitions:

C0 = {T0P0, T1P0}
C1 = {T0P1, T1P1}
...

Any ideas/tips?

I assume most of the work must be done at PartitionAssignor level, there is
any implementation of it that is able to assign partitions like this?

Thank you so much,
FG

-- 
Francesco Guardiani
Website: https://slinkydeveloper.com/
Twitter: https://twitter.com/SlinkyGuardiani

Github: https://github.com/slinkydeveloper


RE: callback function

2020-01-08 Thread Tavares Forby
Sorry, not queue.  I guess this is different than rabbitmq.  Kafka consumer 
pulls data from the broker.  There’s probably no event triggers to call a 
function.
[Kafka Architecture - Kafka Zookeeper Coordination Diagram]

From: Tom Bentley 
Sent: Wednesday, January 8, 2020 1:38 AM
To: Users 
Cc: Tavares Forby 
Subject: [EXT] Re: callback function

Tavares, if you're asking about the consumer then I think you might have a 
misconception about how it works: The application calls poll() to fetch the 
latest records from the broker(s). The broker is not pushing records into some 
queue in the consumer. It might be worth reading 
https://kafka.apache.org/documentation/#design_pull.

Kind regards,

Tom

On Wed, Jan 8, 2020 at 9:09 AM Jonathan Santilli 
mailto:jonathansanti...@gmail.com>> wrote:
Hello, when you say "something is new in the queue" you mean a new
message/record is available in a partition within a topic?

Cheers!

On Tue, Jan 7, 2020, 8:46 PM Tavares Forby 
mailto:tfo...@qti.qualcomm.com>> wrote:

> Hi,
>
> Is there a method in which a function can wake when something is new in
> the queue? This has to be non-blocking to the main thread.
>
> Thanks,
> -Tavares
>


Re: maximum lag between current offset, and last commit

2020-01-08 Thread Clark Sims
Perhaps my understanding is off.
https://docs.confluent.io/current/installation/configuration/broker-configs.html
Perhaps this is the parameter I am looking for?
offsets.retention.check.interval.ms
Perhaps the re-send behavior is controlled by time, and not the
maximum distance between the current offset, and last confirmed
offset.
Regards,
Clark

On Wed, Jan 8, 2020 at 11:55 AM Clark Sims  wrote:
>
> I liked this article:
> https://blog.newrelic.com/engineering/kafka-consumer-config-auto-commit-data-loss/
> I particularly like the illustration in the article:
> http://newrelic-wpengine.netdna-ssl.com/wp-content/uploads/offset_explained2.jpg
>
> What is the maximum difference between the current offset in a
> partition, and the last offset committed?
>
> When current offset = committed offset + maximum allowed, is this when
> Kafka broker starts re-sending messages to the consumer?
>
> Can I configure the maximum allowed offset, to control the re-sending 
> behavior?
>
> Thanks,
> Clark


Re: High CPU Usage on Brokers

2020-01-08 Thread Ismael Juma
Has the behavior changed after an upgrade or has it been consistent since
the start?

Ismael

On Thu, Jan 2, 2020 at 4:18 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> We have a kafka cluster with 12 nodes and we are pretty much seeing 90%
> cpu usage on all the nodes. Here is all the information. Need some help on
> figuring out what the problem is and how to overcome this issue.
>
> *Cluster:*
> Kafka version: 2.3.0
> Number of brokers in cluster: 12
> Node type: 4 vCores 32GB mem
> Network In: 10Mbps per broker
> Network Out: 16Mbps per broker
> Topics: 10 (approximately)
> Partitions: 20 (Max), some has only partitions
> Replication Factor: 3
>
> *CPU Usage:*
> [image: image.png]
>
> *VMStat*
>
> [root]# vmstat 1 10
>
> procs ---memory-- ---swap-- -io -system--
> --cpu-
>
>  r  b   swpd   free   buff  cache   si   sobibo   in   cs us sy id
> wa st
>
>  8  0  0 23  19064 240469800017  202613 38 33
> 28  0  1
>
>  7  0  0 256444  19036 2402388000   768 0 64027 22708 44
> 40 16  0  1
>
>  7  0  0 245356  19052 2403456000   256   472 63509 23276 44
> 39 17  0  1
>
>  7  0  0 235096  19052 2404661600 0 0 62277 22516 46
> 38 15  0  1
>
>  8  0  0 260548  19036 2402008400   516 49888 62364 22894 43
> 38 18  0  1
>
>  5  0  0 249232  19036 2403092400   512 0 61022 24589 41
> 39 20  0  1
>
>  6  0  0 238072  19036 2404251200  1024 0 63358 23063 44
> 38 17  0  0
>
>  5  0  0 262904  19052 2401797200 0   440 63078 23499 46
> 37 17  0  1
>
>  7  0  0 250324  19052 2403000800 0 0 64615 22617 48
> 38 14  0  1
>
>  6  0  0 237920  19052 2404237200  1024 48900 63223 23029 42
> 40 18  0  1
>
>
> *IO Stat:*
>
> [root]# iostat -m
>
> Linux 4.14.72-73.55.amzn2.x86_64 (loc-kafka11.internal.dnaspaces.io)
> 01/02/2020_x86_64_ (4 CPU)
>
>
>
> avg-cpu:  %user   %nice %system %iowait  %steal   %idle
>
>   38.110.00   33.090.110.61   28.08
>
>
>
> Device:tpsMB_read/sMB_wrtn/sMB_readMB_wrtn
>
> xvda  2.36 0.01 0.01  26760  43360
>
> nvme0n1   0.00 0.00 0.00  2  0
>
> xvdf 70.95 0.06 7.67 185908   25205338
>
> *Top Kafka broker threads:*
> [image: image.png]
>
> *Top 3:*
>
> "data-plane-kafka-network-thread-10-ListenerName(PLAINTEXT)-PLAINTEXT-0"
> #60 prio=5 os_prio=0 tid=0x7f8b1ab56000 nid=0x581f runnable
> [0x7f8a886ce000]
>
> "data-plane-kafka-network-thread-10-ListenerName(PLAINTEXT)-PLAINTEXT-2"
> #62 prio=5 os_prio=0 tid=0x7f8b1ab59000 nid=0x5821 runnable
> [0x7f8a6aefd000]
>
> "data-plane-kafka-network-thread-10-ListenerName(PLAINTEXT)-PLAINTEXT-1"
> #61 prio=5 os_prio=0 tid=0x7f8b1ab57800 nid=0x5820 runnable
> [0x7f8a885cd000]
>
> It doesn't looks like GC and IO is the problem.
>
> Thanks
>


Re: complicated logic for tombstone records

2020-01-08 Thread Jan Bols
Hi Boyang, Hi Alex,

thank you for your reply. I can't use windowing so currently I'm managing
removals by wrapping messages in a delete-aware wrapper whenever I have to
do aggregation but this has a big impact on all the logic.

For me the ideal situation would be to get a handle on the state stores
that are being used during aggregation and other processors of the streams
DSL and programmatically delete them from the store whenever needed. This
way I can keep the changes to my streaming logic minimal and still delete
parts of it whenever needed.

Is there any way to do that? I know I can get a read-only reference to the
state stores using queryable stores but that won't do.

Jan

On Thu, Jan 2, 2020 at 11:17 PM Alex Brekken  wrote:

> Hi Jan, unfortunately there is no easy or automatic way to do this.
> Publishing null values directly to the changelog topics will remove them
> from the topic, but it won't remove the corresponding row from the RocksDB
> state store.  (though deleting data programmatically from a state-store
> WILL also remove it from the changelog topic)  Given that you want to
> completely remove the data for a given set of keys, your best option might
> be to modify your topology to handle null messages so that they can get
> deleted from your aggregations. (and publish those from an outside app)
> Hopefully this isn't too self-serving, but I actually wrote a blog post
> about managing state-store data not long ago:
>
> https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
> .
> Hopefully that might give you some ideas.
>
> Alex
>
> On Thu, Jan 2, 2020 at 4:11 PM Boyang Chen 
> wrote:
>
> > Hey Jan,
> >
> > although I believe your case is much more complicated, but would time
> based
> > retention work for you at all? If yes, time window store is like the best
> > option.
> >
> > If no, streams has no out-of-box solution for invalidating the
> aggregation
> > record. It seems at least we could provide an API to inject
> > tombstone records for aggregation logic
> > so that they don't get ignored eventually. This sounds like a good future
> > work.
> >
> > Boyang
> >
> > On Thu, Jan 2, 2020 at 1:47 PM Jan Bols  wrote:
> >
> > > Hi,
> > > I have a rather complicated kafka streams application involving
> multiple
> > > joins, aggregates, maps etc. At a certain point, parts of the data
> needs
> > to
> > > be removed throughout the entire streams topology, both in the topics,
> > the
> > > changelogs and the rocksdb state stores.
> > >
> > > Managing this requires a lot of effort and things get very complex.
> F.e.
> > > when a KStream has a null value and is aggregated, you first need to
> > > convert it into some optional value instead b/c aggregates ignore
> nulls.
> > >
> > > Is there a better way or a way that does not impact all the existing
> > > streaming logic?
> > >
> > > I was thinking about having an out-of-bound process that sends null
> > values
> > > to all topics with the correct keys. I could then filter out all null
> > > values before doing the rest of the existing stream logic.
> > > Would that make sense?
> > >
> > > I can send null values to all my topics, but how do I get the changelog
> > > topics created by kafka-streams. And what about the state store?
> > >
> > > Best regards
> > > Jan
> > >
> >
>


Free Kafka Stream Data

2020-01-08 Thread cool girl
Hi ,

I am trying to learn Kafka. Is there any free API which I can use like
twitter? I created twitter account but looks like ti will take days before
I can use their streaming data .

Thanks
Priyanka


Re: Free Kafka Stream Data

2020-01-08 Thread M. Manna
Priyanka,

On Wed, 8 Jan 2020 at 20:42, cool girl  wrote:

> Hi ,
>
> I am trying to learn Kafka. Is there any free API which I can use like
> twitter? I created twitter account but looks like ti will take days before
> I can use their streaming data .
>

  Welcome to Kafka. If you are seeking a REST API for pubsub/streaming.
there is none yet.
The only REST API that I am aware of is part of Kafka Connect - which you
can read about from here -
http://kafka.apache.org/documentation/#connect_rest

There are also REST APIs provided under Confluent licence, but they are
probably more admin and not what you are looking for. Please visit
confluent.io to check more details.

Regards,

>
> Thanks
> Priyanka
>


Re: Free Kafka Stream Data

2020-01-08 Thread Robin Moffatt
Hi,

This blog shows using Twitter as a source for Kafka:
https://www.confluent.io/blog/stream-processing-twitter-data-with-ksqldb/
The credentials necessary for that Twitter API are created instantly AFAIK.

If you just want messages to be generated in Kafka have a look at
https://www.confluent.io/blog/easy-ways-generate-test-data-kafka/


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Wed, 8 Jan 2020 at 20:42, cool girl  wrote:

> Hi ,
>
> I am trying to learn Kafka. Is there any free API which I can use like
> twitter? I created twitter account but looks like ti will take days before
> I can use their streaming data .
>
> Thanks
> Priyanka
>


Re: Free Kafka Stream Data

2020-01-08 Thread cool girl
Thanks.

On Wed, Jan 8, 2020, 1:16 PM Robin Moffatt  wrote:

> Hi,
>
> This blog shows using Twitter as a source for Kafka:
> https://www.confluent.io/blog/stream-processing-twitter-data-with-ksqldb/
> The credentials necessary for that Twitter API are created instantly AFAIK.
>
> If you just want messages to be generated in Kafka have a look at
> https://www.confluent.io/blog/easy-ways-generate-test-data-kafka/
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Wed, 8 Jan 2020 at 20:42, cool girl  wrote:
>
> > Hi ,
> >
> > I am trying to learn Kafka. Is there any free API which I can use like
> > twitter? I created twitter account but looks like ti will take days
> before
> > I can use their streaming data .
> >
> > Thanks
> > Priyanka
> >
>


Re: Consuming from N topics "co-partitioned"

2020-01-08 Thread Matthias J. Sax
> I assume most of the work must be done at PartitionAssignor level, 

Correct.

> there is
>> any implementation of it that is able to assign partitions like this?

Well, you can look into `StreamsPartitionAssignor` (but it does much
more than just co-partitioning, hence, it might be hard to extract the
part of the logic you are interested in.

But even if you implement it from scratch, it should not be too difficult.

Our of curiosity: why not just use Kafka Streams?


-Matthias

On 1/8/20 9:00 AM, Francesco Guardiani wrote:
> Hi everybody,
> I wish to implement a KafkaConsumer that consumes messages from N
> co-partitioned topics, in a similar way to KafkaStreams for join semantic.
> e.g. for two consumers and two topics with two partitions:
> 
> C0 = {T0P0, T1P0}
> C1 = {T0P1, T1P1}
> ...
> 
> Any ideas/tips?
> 
> I assume most of the work must be done at PartitionAssignor level, there is
> any implementation of it that is able to assign partitions like this?
> 
> Thank you so much,
> FG
> 



signature.asc
Description: OpenPGP digital signature


Re: about default value of struct of connect api

2020-01-08 Thread Lisheng Wang
update my findings,

below is code of method 'defaultValue':

public SchemaBuilder defaultValue(Object value) {
checkCanSet(DEFAULT_FIELD, defaultValue, value);
checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
try {
ConnectSchema.validateValue(this, value);
} catch (DataException e) {
throw new SchemaBuilderException("Invalid default value", e);
}
defaultValue = value;
return this;
}

seems will be ok if i change ConnectSchema.validateValue(this, value) to
ConnectSchema.validateValue(this.builder(), value), i dont know if it's ok
for other case.

Thanks.

Best,
Lisheng


Lisheng Wang  于2020年1月8日周三 上午11:44写道:

> hello guys,
>
> update my question,
>
> i made a test, code as below, i want to set a default value of address to
> person
>
> SchemaBuilder schemaBuilder =
> SchemaBuilder.struct().name("address")
> .field("province", SchemaBuilder.STRING_SCHEMA)
> .field("city", SchemaBuilder.STRING_SCHEMA);
> Struct defaultValue = new Struct(schemaBuilder.build())
> .put("province", "")
> .put("city", "");
> Schema dataSchema = SchemaBuilder.struct().name("person")
> .field("address",
> schemaBuilder.defaultValue(defaultValue).build()).build();
> Struct struct = new Struct(dataSchema);
> System.out.println(struct.toString());
>
> i got exception as below
>
> Exception in thread "main"
> org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default
> value
> at
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
>
> Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas
> do not match.
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
> at
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
> ... 1 more
>
> i digged code of ConnectSchema.validateValue and found when type is
> STRUCT, then will check class of schema, but one is  SchemaBuilder, another
> is ConnectSchema
>
>  case STRUCT:
> Struct struct = (Struct) value;
> if (!struct.schema().equals(schema))
> throw new DataException("Struct schemas do not
> match.");
> struct.validate();
> break;
>
> the method of equals is
>
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> ConnectSchema schema = (ConnectSchema) o;
> return Objects.equals(optional, schema.optional) &&
> Objects.equals(version, schema.version) &&
> Objects.equals(name, schema.name) &&
> Objects.equals(doc, schema.doc) &&
> Objects.equals(type, schema.type) &&
> Objects.deepEquals(defaultValue, schema.defaultValue) &&
> Objects.equals(fields, schema.fields) &&
> Objects.equals(keySchema, schema.keySchema) &&
> Objects.equals(valueSchema, schema.valueSchema) &&
> Objects.equals(parameters, schema.parameters);
> }
>
> can anyone help how to set default value of "STRUCT" type with connect api?
>
> Thanks
>
> Best,
> Lisheng
>
>
> Lisheng Wang  于2020年1月6日周一 下午3:31写道:
>
>> hello kafka devs
>>
>> i'm facing a problem that how to set a default value of struct.
>>
>> i'm following https://docs.confluent.io/current/connect/devguide.html
>>
>> Schema schema = SchemaBuilder.struct().name(NAME)
>> .field("name", Schema.STRING_SCHEMA)
>> .field("age", Schema.INT_SCHEMA)
>> .field("admin", new
>> SchemaBuilder.boolean().defaultValue(false).build())
>> .build();
>>
>> Struct struct = new Struct(schema)
>> .put("name", "Barbara Liskov")
>> .put("age", 75)
>> .build();
>>
>> below is my code, i dont know how to set default value when schema type
>> is struct
>>
>> Schema schema1 = SchemaBuilder.struct().name("info")
>> .field("address", Schema.STRING_SCHEMA)
>> .field("code",
>> Schema.STRING_SCHEMA).defaultValue("").build();
>>
>> Thanks!
>>
>>
>>
>>
>>
>> Best,
>> Lisheng
>>
>


Does Kafka Support IPV6

2020-01-08 Thread 卢博武
hello:
    I want to make sure that Kafka supports IPV6, and that IPV6 is 
likely to be used in the near future.


Thank you




bowu lu
vastdata.com.cn

java.net.BindException: Address already in use (Bind failed) with kafka-topics command

2020-01-08 Thread JOHN, BIBIN
Could you please let me know why sometimes I am getting, below exception? 
Cluster is up and running. Kafka and ZK is running in same server. We also have 
jmx exporter configured for monitoring.

kafka-topics --zookeeper localhost:2181 --list
Error: Exception thrown by the agent : java.rmi.server.ExportException: Port 
already in use: 0; nested exception is:
java.net.BindException: Address already in use (Bind failed)
sun.management.AgentConfigurationError: java.rmi.server.ExportException: Port 
already in use: 0; nested exception is:
java.net.BindException: Address already in use (Bind failed)
at 
sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:553)
at sun.management.Agent.startLocalManagementAgent(Agent.java:137)
at sun.management.Agent.startAgent(Agent.java:265)
at sun.management.Agent.startAgent(Agent.java:452)
Caused by: java.rmi.server.ExportException: Port already in use: 0; nested 
exception is:
java.net.BindException: Address already in use (Bind failed)
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:346)
at 
sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
at sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
at 
sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:236)
at 
sun.management.jmxremote.ConnectorBootstrap$PermanentExporter.exportObject(ConnectorBootstrap.java:199)
at 
javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:146)
at 
javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:122)
at 
javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:404)
at 
sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:550)
... 3 more
Caused by: java.net.BindException: Address already in use (Bind failed)
at java.net.PlainSocketImpl.socketBind(Native Method)
at 
java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
at java.net.ServerSocket.bind(ServerSocket.java:375)
at java.net.ServerSocket.(ServerSocket.java:237)
at java.net.ServerSocket.(ServerSocket.java:128)
at 
sun.management.jmxremote.LocalRMIServerSocketFactory$1.(LocalRMIServerSocketFactory.java:49)
at 
sun.management.jmxremote.LocalRMIServerSocketFactory.createServerSocket(LocalRMIServerSocketFactory.java:49)
at 
sun.rmi.transport.tcp.TCPEndpoint.newServerSocket(TCPEndpoint.java:666)
at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:335)
... 12 more



Re: java.net.BindException: Address already in use (Bind failed) with kafka-topics command

2020-01-08 Thread kalai selvan
Hello,

Looks like your server is running out of ephemeral ports. The below command
confirms it is running out of ephemeral ports. We can restart the process
to solve the issue.  Must probably root cause of the issue will be wrong
configuration of mirror maker process / it is a bug.

netstat -an | wc -l

Regards,
Kalai

On Thu, 9 Jan, 2020, 10:46 AM JOHN, BIBIN,  wrote:

> Could you please let me know why sometimes I am getting, below exception?
> Cluster is up and running. Kafka and ZK is running in same server. We also
> have jmx exporter configured for monitoring.
>
> kafka-topics --zookeeper localhost:2181 --list
> Error: Exception thrown by the agent : java.rmi.server.ExportException:
> Port already in use: 0; nested exception is:
> java.net.BindException: Address already in use (Bind failed)
> sun.management.AgentConfigurationError: java.rmi.server.ExportException:
> Port already in use: 0; nested exception is:
> java.net.BindException: Address already in use (Bind failed)
> at
> sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:553)
> at sun.management.Agent.startLocalManagementAgent(Agent.java:137)
> at sun.management.Agent.startAgent(Agent.java:265)
> at sun.management.Agent.startAgent(Agent.java:452)
> Caused by: java.rmi.server.ExportException: Port already in use: 0; nested
> exception is:
> java.net.BindException: Address already in use (Bind failed)
> at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:346)
> at
> sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
> at
> sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
> at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
> at
> sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:236)
> at
> sun.management.jmxremote.ConnectorBootstrap$PermanentExporter.exportObject(ConnectorBootstrap.java:199)
> at
> javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:146)
> at
> javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:122)
> at
> javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:404)
> at
> sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:550)
> ... 3 more
> Caused by: java.net.BindException: Address already in use (Bind failed)
> at java.net.PlainSocketImpl.socketBind(Native Method)
> at java.net
> .AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
> at java.net.ServerSocket.bind(ServerSocket.java:375)
> at java.net.ServerSocket.(ServerSocket.java:237)
> at java.net.ServerSocket.(ServerSocket.java:128)
> at
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.(LocalRMIServerSocketFactory.java:49)
> at
> sun.management.jmxremote.LocalRMIServerSocketFactory.createServerSocket(LocalRMIServerSocketFactory.java:49)
> at
> sun.rmi.transport.tcp.TCPEndpoint.newServerSocket(TCPEndpoint.java:666)
> at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:335)
> ... 12 more
>
>


RE: java.net.BindException: Address already in use (Bind failed) with kafka-topics command

2020-01-08 Thread JOHN, BIBIN
we don’t have mirrormaker configured in this cluster. Please find output of 
netstat command.

netstat -an | wc -l
570322


-Original Message-
From: kalai selvan  
Sent: Wednesday, January 8, 2020 11:41 PM
To: users@kafka.apache.org
Subject: Re: java.net.BindException: Address already in use (Bind failed) with 
kafka-topics command

Hello,

Looks like your server is running out of ephemeral ports. The below command 
confirms it is running out of ephemeral ports. We can restart the process to 
solve the issue.  Must probably root cause of the issue will be wrong 
configuration of mirror maker process / it is a bug.

netstat -an | wc -l

Regards,
Kalai

On Thu, 9 Jan, 2020, 10:46 AM JOHN, BIBIN,  wrote:

> Could you please let me know why sometimes I am getting, below exception?
> Cluster is up and running. Kafka and ZK is running in same server. We 
> also have jmx exporter configured for monitoring.
>
> kafka-topics --zookeeper localhost:2181 --list
> Error: Exception thrown by the agent : java.rmi.server.ExportException:
> Port already in use: 0; nested exception is:
> java.net.BindException: Address already in use (Bind failed)
> sun.management.AgentConfigurationError: java.rmi.server.ExportException:
> Port already in use: 0; nested exception is:
> java.net.BindException: Address already in use (Bind failed)
> at
> sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:553)
> at sun.management.Agent.startLocalManagementAgent(Agent.java:137)
> at sun.management.Agent.startAgent(Agent.java:265)
> at sun.management.Agent.startAgent(Agent.java:452)
> Caused by: java.rmi.server.ExportException: Port already in use: 0; 
> nested exception is:
> java.net.BindException: Address already in use (Bind failed)
> at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:346)
> at
> sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
> at
> sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
> at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
> at
> sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:236)
> at
> sun.management.jmxremote.ConnectorBootstrap$PermanentExporter.exportObject(ConnectorBootstrap.java:199)
> at
> javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:146)
> at
> javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:122)
> at
> javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:404)
> at
> sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:550)
> ... 3 more
> Caused by: java.net.BindException: Address already in use (Bind failed)
> at java.net.PlainSocketImpl.socketBind(Native Method)
> at java.net
> .AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
> at java.net.ServerSocket.bind(ServerSocket.java:375)
> at java.net.ServerSocket.(ServerSocket.java:237)
> at java.net.ServerSocket.(ServerSocket.java:128)
> at
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.(LocalRMIServerSocketFactory.java:49)
> at
> sun.management.jmxremote.LocalRMIServerSocketFactory.createServerSocket(LocalRMIServerSocketFactory.java:49)
> at
> sun.rmi.transport.tcp.TCPEndpoint.newServerSocket(TCPEndpoint.java:666)
> at sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:335)
> ... 12 more
>
>


Re: java.net.BindException: Address already in use (Bind failed) with kafka-topics command

2020-01-08 Thread kalai selvan
Hello,

Please refer the documentation for increasing the ephemeral ports. Mostly
it will solve the issue.


https://www.google.com/amp/s/www.cyberciti.biz/tips/linux-increase-outgoing-network-sockets-range.html/amp

Regards,
Kalai

On Thu, 9 Jan, 2020, 11:14 AM JOHN, BIBIN,  wrote:

> we don’t have mirrormaker configured in this cluster. Please find output
> of netstat command.
>
> netstat -an | wc -l
> 570322
>
>
> -Original Message-
> From: kalai selvan 
> Sent: Wednesday, January 8, 2020 11:41 PM
> To: users@kafka.apache.org
> Subject: Re: java.net.BindException: Address already in use (Bind failed)
> with kafka-topics command
>
> Hello,
>
> Looks like your server is running out of ephemeral ports. The below
> command confirms it is running out of ephemeral ports. We can restart the
> process to solve the issue.  Must probably root cause of the issue will be
> wrong configuration of mirror maker process / it is a bug.
>
> netstat -an | wc -l
>
> Regards,
> Kalai
>
> On Thu, 9 Jan, 2020, 10:46 AM JOHN, BIBIN,  wrote:
>
> > Could you please let me know why sometimes I am getting, below exception?
> > Cluster is up and running. Kafka and ZK is running in same server. We
> > also have jmx exporter configured for monitoring.
> >
> > kafka-topics --zookeeper localhost:2181 --list
> > Error: Exception thrown by the agent : java.rmi.server.ExportException:
> > Port already in use: 0; nested exception is:
> > java.net.BindException: Address already in use (Bind failed)
> > sun.management.AgentConfigurationError: java.rmi.server.ExportException:
> > Port already in use: 0; nested exception is:
> > java.net.BindException: Address already in use (Bind failed)
> > at
> >
> sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:553)
> > at sun.management.Agent.startLocalManagementAgent(Agent.java:137)
> > at sun.management.Agent.startAgent(Agent.java:265)
> > at sun.management.Agent.startAgent(Agent.java:452)
> > Caused by: java.rmi.server.ExportException: Port already in use: 0;
> > nested exception is:
> > java.net.BindException: Address already in use (Bind failed)
> > at
> sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:346)
> > at
> > sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
> > at
> > sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:411)
> > at sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
> > at
> > sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:236)
> > at
> >
> sun.management.jmxremote.ConnectorBootstrap$PermanentExporter.exportObject(ConnectorBootstrap.java:199)
> > at
> >
> javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:146)
> > at
> >
> javax.management.remote.rmi.RMIJRMPServerImpl.export(RMIJRMPServerImpl.java:122)
> > at
> >
> javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:404)
> > at
> >
> sun.management.jmxremote.ConnectorBootstrap.startLocalConnectorServer(ConnectorBootstrap.java:550)
> > ... 3 more
> > Caused by: java.net.BindException: Address already in use (Bind failed)
> > at java.net.PlainSocketImpl.socketBind(Native Method)
> > at java.net
> > .AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
> > at java.net.ServerSocket.bind(ServerSocket.java:375)
> > at java.net.ServerSocket.(ServerSocket.java:237)
> > at java.net.ServerSocket.(ServerSocket.java:128)
> > at
> >
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.(LocalRMIServerSocketFactory.java:49)
> > at
> >
> sun.management.jmxremote.LocalRMIServerSocketFactory.createServerSocket(LocalRMIServerSocketFactory.java:49)
> > at
> > sun.rmi.transport.tcp.TCPEndpoint.newServerSocket(TCPEndpoint.java:666)
> > at
> sun.rmi.transport.tcp.TCPTransport.listen(TCPTransport.java:335)
> > ... 12 more
> >
> >
>