You may check the Kafka.log to see what's inside Yan Fang
> On Jul 26, 2015, at 2:01 AM, Job-Selina Wu <swucaree...@gmail.com> wrote: > > The exception is below: > > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at kafka.producer.Producer.send(Producer.scala:77) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > at http.server.HttpDemoHandler.doDemo(HttpDemoHandler.java:71) > at http.server.HttpDemoHandler.handle(HttpDemoHandler.java:32) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:498) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:265) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:243) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:610) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:539) > at java.lang.Thread.run(Thread.java:745) > > On Sun, Jul 26, 2015 at 12:42 AM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > >> Hi, Yan: >> >> My Http Server send message to Kafka. >> >> The server.log at deploy/kafka/logs/server.log shown : >> >> [2015-07-26 00:33:51,910] INFO Closing socket connection to /127.0.0.1. >> (kafka.network.Processor) >> [2015-07-26 00:33:51,984] INFO Closing socket connection to /127.0.0.1. >> (kafka.network.Processor) >> [2015-07-26 00:33:52,011] INFO Closing socket connection to /127.0.0.1. >> (kafka.network.Processor) >> >> ..... >> >> >> Your help is highly appreciated. >> >> Sincerely, >> >> Selina >> >> >>> On Sun, Jul 26, 2015 at 12:01 AM, Yan Fang <yanfang...@gmail.com> wrote: >>> >>> You are giving the Kafka code and the Samza log, which does not make sense >>> actually... >>> >>> Fang, Yan >>> yanfang...@gmail.com >>> >>> On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu <swucaree...@gmail.com> >>> wrote: >>> >>>> Hi, Yi, Navina and Benjamin: >>>> >>>> Thanks a lot to spending your time to help me this issue. >>>> >>>> The configuration is below. Do you think it could be the >>> configuration >>>> problem? >>>> I tried props.put("request.required.acks", "0"); and >>>> props.put("request.required.acks", "1"); both did not work. >>>> >>>> -------- >>>> Properties props = new Properties(); >>>> >>>> private final Producer<String, String> producer; >>>> >>>> public KafkaProducer() { >>>> //BOOTSTRAP.SERVERS >>>> props.put("metadata.broker.list", "localhost:9092"); >>>> props.put("bootstrap.servers", "localhost:9092 "); >>>> props.put("serializer.class", "kafka.serializer.StringEncoder"); >>>> props.put("partitioner.class", "com.kafka.SimplePartitioner"); >>>> props.put("request.required.acks", "0"); >>>> >>>> ProducerConfig config = new ProducerConfig(props); >>>> >>>> producer = new Producer<String, String>(config); >>>> } >>>> >>>> -------------- >>>> >>>> Exceptions at log are list below. >>>> >>>> Your help is highly appreciated. >>>> >>>> Sincerely, >>>> Selina Wu >>>> >>>> >>>> Exceptions at log >>> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000001/samza-application-master.log >>>> >>>> 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home >>>> directory >>>> *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*. >>>> at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265) >>>> at org.apache.hadoop.util.Shell.<clinit>(Shell.java:290) >>>> at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76) >>>> at >>> org.apache.hadoop.yarn.conf.YarnConfiguration.<clinit>(YarnConfiguration.java:517) >>>> at >>>> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77) >>>> at >>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) >>>> 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this >>>> machine. So not using it. >>>> 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0 >>>> 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM >>>> 127.0.0.1:8032 >>>> 2015-07-25 22:03:52 AbstractService [DEBUG] Service: >>>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state >>>> INITED >>>> 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager at >>>> /127.0.0.1:8032 >>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field >>>> org.apache.hadoop.metrics2.lib.MutableRate >>>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess >>>> with annotation >>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=, >>>> always=false, type=DEFAULT, value=[Rate of successful kerberos logins >>>> and latency (milliseconds)], valueName=Time) >>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field >>>> org.apache.hadoop.metrics2.lib.MutableRate >>>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure >>>> with annotation >>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=, >>>> always=false, type=DEFAULT, value=[Rate of failed kerberos logins and >>>> latency (milliseconds)], valueName=Time) >>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field >>>> org.apache.hadoop.metrics2.lib.MutableRate >>>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups >>>> with annotation >>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=, >>>> always=false, type=DEFAULT, value=[GetGroups], valueName=Time) >>>> 2015-07-25 22:03:52 MetricsSystemImpl [DEBUG] UgiMetrics, User and >>>> group related metrics >>>> 2015-07-25 22:03:52 KerberosName [DEBUG] Kerberos krb5 configuration >>>> not found, setting default realm to empty >>>> 2015-07-25 22:03:52 Groups [DEBUG] Creating new Groups object >>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Trying to load the >>>> custom-built native-hadoop library... >>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Failed to load >>>> native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in >>>> java.library.path >>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] >>> java.library.path=/home//Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. >>>> 2015-07-25 22:03:52 NativeCodeLoader [WARN] Unable to load >>>> native-hadoop library for your platform... using builtin-java classes >>>> where applicable.... >>>> >>>> >>>> 2015-07-25 22:03:53 KafkaCheckpointManager [WARN] While trying to >>>> validate topic __samza_checkpoint_ver_1_for_demo-parser7_1: >>>> *kafka.common.LeaderNotAvailableException. Retrying.* >>>> 2015-07-25 22:03:53 KafkaCheckpointManager [DEBUG] Exception detail: >>>> kafka.common.LeaderNotAvailableException >>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>> Method) >>>> at >>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >>>> at >>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:422) >>>> at java.lang.Class.newInstance(Class.java:442) >>>> at >>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:84) >>>> at >>>> org.apache.samza.util.KafkaUtil$.maybeThrowException(KafkaUtil.scala:63) >>>> at >>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:389) >>>> at >>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:386) >>>> at >>> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) >>>> at >>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.validateTopic(KafkaCheckpointManager.scala:385) >>>> at >>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.start(KafkaCheckpointManager.scala:336) >>>> at >>> org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:127) >>>> at >>> org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:55) >>>> at >>> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:72) >>>> at >>>> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:93) >>>> at >>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Verifying properties >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property client.id is >>>> overridden to samza_checkpoint_manager-demo_parser7-1-1437887032962-0 >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property >>>> metadata.broker.list is overridden to localhost:9092 >>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property >>>> request.timeout.ms is overridden to 30000 >>>> 2015-07-25 22:03:53 ClientUtils$ [INFO] Fetching metadata from broker >>>> id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s) >>>> Set(__samza_checkpoint_ver_1_for_demo-parser7_1) >>>> 2015-07-25 22:03:53 BlockingChannel [DEBUG] Created socket with >>>> SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 408300 (requested >>>> -1), SO_SNDBUF = 114324 (requested 102400), connectTimeoutMs = 30000. >>>> 2015-07-25 22:03:53 SyncProducer [INFO] Connected to localhost:9092 >>>> for producing >>>> 2015-07-25 22:03:53 SyncProducer [INFO] Disconnecting from >>> localhost:9092 >>>> 2015-07-25 22:03:53 ClientUtils$ [DEBUG] Successfully fetched metadata >>>> for 1 topic(s) Set(__samza_checkpoint_ver_1_for_demo-parser7_1) >>>> 2015-07-25 22:03:53 KafkaCheckpointManager [INFO] Successfully >>>> validated checkpoint topic >>>> __samza_checkpoint_ver_1_for_demo-parser7_1. >>>> >>>> >>>> >>>> Exception at log >>> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000002/stderr >>>> >>>> 2015-07-25 22:06:03 HttpDemoParserStreamTask [INFO] key=123: >>>> message=timestamp=06-20-2015 id=123 ip=22.231.113.69 browser=Chrome >>>> postalCode=95131 url=http://sample1.com language=ENG mobileBrand=Apple >>>> *count=3860* >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [INFO] Reconnect due to >>>> socket error: java.io.EOFException: Received -1 when reading from >>>> channel, socket has likely been closed. >>>> 2015-07-25 22:06:04 Selector [WARN] >>>> *Error in I/O with localhost/127.0.0.1 <http://127.0.0.1 >>>>> java.io.EOFException* >>>> at >>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) >>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:248) >>>> at >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting >>>> from selinas-mbp.attlocal.net:9092 >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with >>>> Selinas-MBP.attlocal.net/192.168.1.227 >>>> java.io.EOFException >>>> at >>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) >>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:248) >>>> at >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting >>>> from selinas-mbp.attlocal.net:9092 >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with >>>> Selinas-MBP.attlocal.net/192.168.1.227 >>>> java.io.EOFException >>>> at >>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) >>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:248) >>>> at >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with localhost/ >>> 127.0.0.1 >>>> java.io.EOFException >>>> at >>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) >>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:248) >>>> at >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected. >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected. >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected. >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected. >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata >>>> request to node 0 >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata >>>> request to node 0 >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0 >>>> for sending metadata request in the next iteration >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0 >>>> for sending metadata request in the next iteration >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to >>>> node 0 at selinas-mbp.attlocal.net:9092. >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to >>>> node 0 at selinas-mbp.attlocal.net:9092. >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata >>>> request to node 0 >>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata >>>> request to node 0 >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with >>>> selinas-mbp.attlocal.net/192.168.1.227 >>>> java.net.ConnectException: Connection refused >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >>>> at >>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:238) >>>> at >>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) >>>> at >>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting >>>> from selinas-mbp.attlocal.net:9092 >>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O >>>> >>>> >>>>> On Fri, Jul 24, 2015 at 5:03 PM, Benjamin Black <b...@b3k.us> wrote: >>>>> >>>>> what are the log messages from the kafka brokers? these look like >>> client >>>>> messages indicating a broker problem. >>>>> >>>>> On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucaree...@gmail.com >>>> >>>>> wrote: >>>>> >>>>>> Hi, Yi: >>>>>> >>>>>> I am wondering if the problem can be fixed by the parameter " >>>>>> max.message.size" at kafka.producer.ProducerConfig for the topic >>> size? >>>>>> >>>>>> My Http Server send message to Kafka. The last message shown >>> on >>>>>> console is >>>>>> "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari >>>>>> postalCode=95066 url=http://sample2.com language=ENG >>> mobileBrand=Apple >>>>>> count=4269" >>>>>> >>>>>> However the Kafka got Exception from message 4244th >>>>>> The error is below and Kafka do not accept any new message after >>> this. >>>>>> >>>>>> "[2015-07-24 12:46:11,078] WARN >>> [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread], >>>>>> Failed to find leader for Set([http-demo,0]) >>>>>> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) >>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>> [Set(http-demo)] from broker >>>>> [ArrayBuffer(id:0,host:10.1.10.173,port:9092)] >>>>>> failed >>>>>> at >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>> at >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) >>>>>> at >>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) >>>>>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> ... 3 more >>>>>> [2015-07-24 12:46:11,287] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>> 21 for topics [Set(http-demo)] from broker >>>>>> [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) >>>>>> at >>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) >>>>>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)" >>>>>> >>>>>> >>>>>> After the Error: >>>>>> I show the topic, it is right, but can not show the content by >>> command >>>>> line >>>>>> >>>>>> Selinas-MacBook-Pro:samza-Demo selina$ >>> deploy/kafka/bin/kafka-topics.sh >>>>>> --list --zookeeper localhost:2181 >>>>>> http-demo >>>>>> Selinas-MacBook-Pro:samza-Demo selina$ >>>>> deploy/kafka/bin/kafka-console-consumer.sh >>>>>> --zookeeper localhost:2181 --from-beginning --topic http-demo >>>>>> [2015-07-24 12:47:38,730] WARN >>> [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87], >>>>>> no brokers found when trying to rebalance. >>>>>> (kafka.consumer.ZookeeperConsumerConnector) >>>>>> >>>>>> Attached is my Kafka properties for server and producer. >>>>>> >>>>>> Your help is highly appreciated >>>>>> >>>>>> >>>>>> Sincerely, >>>>>> Selina >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpa...@gmail.com> >>> wrote: >>>>>> >>>>>>> Hi, Selina, >>>>>>> >>>>>>> Your question is not clear. >>>>>>> {quote} >>>>>>> When the messages was send to Kafka by KafkaProducer, It always >>> failed >>>>>>> when the message more than 3000 - 4000 messages. >>>>>>> {quote} >>>>>>> >>>>>>> What's failing? The error stack shows errors on the consumer side >>> and >>>>> you >>>>>>> were referring to failures to produce to Kafka. Could you be more >>>>> specific >>>>>>> regarding to what's your failure scenario? >>>>>>> >>>>>>> -Yi >>>>>>> >>>>>>> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu < >>> swucaree...@gmail.com >>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> When the messages was send to Kafka by KafkaProducer, It >>> always >>>>>>> failed >>>>>>>> when the message more than 3000 - 4000 messages. The error is >>> shown >>>>>>> below. >>>>>>>> I am wondering if any topic size I need to set at Samza >>>> configuration? >>>>>>>> >>>>>>>> >>>>>>>> [2015-07-23 17:30:03,792] WARN >>> [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread], >>>>>>>> Failed to find leader for Set([http-demo,0]) >>>>>>>> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) >>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>>>> [Set(http-demo)] from broker [ArrayBuffer()] failed >>>>>>>> at >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>>>> at >>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93) >>>>>>>> at >>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) >>>>>>>> at >>>>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) >>>>>>>> ^CConsumed 4327 messages >>>>>>>> >>>>>>>> Your reply and comment will be highly appreciated. >>>>>>>> >>>>>>>> >>>>>>>> Sincerely, >>>>>>>> Selina >> >>