You may check the Kafka.log to see what's inside 

Yan Fang

> On Jul 26, 2015, at 2:01 AM, Job-Selina Wu <swucaree...@gmail.com> wrote:
> 
> The exception is below:
> 
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> at kafka.producer.Producer.send(Producer.scala:77)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at http.server.HttpDemoHandler.doDemo(HttpDemoHandler.java:71)
> at http.server.HttpDemoHandler.handle(HttpDemoHandler.java:32)
> at
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)
> at org.eclipse.jetty.server.Server.handle(Server.java:498)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:265)
> at
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:243)
> at
> org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:610)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:539)
> at java.lang.Thread.run(Thread.java:745)
> 
> On Sun, Jul 26, 2015 at 12:42 AM, Job-Selina Wu <swucaree...@gmail.com>
> wrote:
> 
>> Hi, Yan:
>> 
>>      My Http Server send message to Kafka.
>> 
>> The server.log at deploy/kafka/logs/server.log shown :
>> 
>> [2015-07-26 00:33:51,910] INFO Closing socket connection to /127.0.0.1. 
>> (kafka.network.Processor)
>> [2015-07-26 00:33:51,984] INFO Closing socket connection to /127.0.0.1. 
>> (kafka.network.Processor)
>> [2015-07-26 00:33:52,011] INFO Closing socket connection to /127.0.0.1. 
>> (kafka.network.Processor)
>> 
>> .....
>> 
>> 
>> Your help is highly appreciated.
>> 
>> Sincerely,
>> 
>> Selina
>> 
>> 
>>> On Sun, Jul 26, 2015 at 12:01 AM, Yan Fang <yanfang...@gmail.com> wrote:
>>> 
>>> You are giving the Kafka code and the Samza log, which does not make sense
>>> actually...
>>> 
>>> Fang, Yan
>>> yanfang...@gmail.com
>>> 
>>> On Sat, Jul 25, 2015 at 10:31 PM, Job-Selina Wu <swucaree...@gmail.com>
>>> wrote:
>>> 
>>>> Hi, Yi, Navina and Benjamin:
>>>> 
>>>>    Thanks a lot to spending your time to help me this issue.
>>>> 
>>>>    The configuration is below. Do you think it could be the
>>> configuration
>>>> problem?
>>>> I tried         props.put("request.required.acks", "0"); and
>>>> props.put("request.required.acks", "1"); both did not work.
>>>> 
>>>> --------
>>>> Properties props = new Properties();
>>>> 
>>>>    private final Producer<String, String> producer;
>>>> 
>>>>    public KafkaProducer() {
>>>>        //BOOTSTRAP.SERVERS
>>>>        props.put("metadata.broker.list", "localhost:9092");
>>>>        props.put("bootstrap.servers", "localhost:9092 ");
>>>>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>>>>        props.put("partitioner.class", "com.kafka.SimplePartitioner");
>>>>        props.put("request.required.acks", "0");
>>>> 
>>>>        ProducerConfig config = new ProducerConfig(props);
>>>> 
>>>>        producer = new Producer<String, String>(config);
>>>>    }
>>>> 
>>>> --------------
>>>> 
>>>>     Exceptions at log are list below.
>>>> 
>>>> Your help is highly appreciated.
>>>> 
>>>> Sincerely,
>>>> Selina Wu
>>>> 
>>>> 
>>>> Exceptions at log
>>> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000001/samza-application-master.log
>>>> 
>>>> 2015-07-25 22:03:52 Shell [DEBUG] Failed to detect a valid hadoop home
>>>> directory
>>>> *java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set*.
>>>>   at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265)
>>>>   at org.apache.hadoop.util.Shell.<clinit>(Shell.java:290)
>>>>   at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
>>>>   at
>>> org.apache.hadoop.yarn.conf.YarnConfiguration.<clinit>(YarnConfiguration.java:517)
>>>>   at
>>>> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:77)
>>>>   at
>>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>>>> 2015-07-25 22:03:52 Shell [DEBUG] setsid is not available on this
>>>> machine. So not using it.
>>>> 2015-07-25 22:03:52 Shell [DEBUG] setsid exited with exit code 0
>>>> 2015-07-25 22:03:52 ClientHelper [INFO] trying to connect to RM
>>>> 127.0.0.1:8032
>>>> 2015-07-25 22:03:52 AbstractService [DEBUG] Service:
>>>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state
>>>> INITED
>>>> 2015-07-25 22:03:52 RMProxy [INFO] Connecting to ResourceManager at
>>>> /127.0.0.1:8032
>>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
>>>> org.apache.hadoop.metrics2.lib.MutableRate
>>>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
>>>> with annotation
>>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
>>>> always=false, type=DEFAULT, value=[Rate of successful kerberos logins
>>>> and latency (milliseconds)], valueName=Time)
>>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
>>>> org.apache.hadoop.metrics2.lib.MutableRate
>>>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
>>>> with annotation
>>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
>>>> always=false, type=DEFAULT, value=[Rate of failed kerberos logins and
>>>> latency (milliseconds)], valueName=Time)
>>>> 2015-07-25 22:03:52 MutableMetricsFactory [DEBUG] field
>>>> org.apache.hadoop.metrics2.lib.MutableRate
>>>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups
>>>> with annotation
>>>> @org.apache.hadoop.metrics2.annotation.Metric(sampleName=Ops, about=,
>>>> always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
>>>> 2015-07-25 22:03:52 MetricsSystemImpl [DEBUG] UgiMetrics, User and
>>>> group related metrics
>>>> 2015-07-25 22:03:52 KerberosName [DEBUG] Kerberos krb5 configuration
>>>> not found, setting default realm to empty
>>>> 2015-07-25 22:03:52 Groups [DEBUG]  Creating new Groups object
>>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Trying to load the
>>>> custom-built native-hadoop library...
>>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG] Failed to load
>>>> native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in
>>>> java.library.path
>>>> 2015-07-25 22:03:52 NativeCodeLoader [DEBUG]
>>> java.library.path=/home//Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
>>>> 2015-07-25 22:03:52 NativeCodeLoader [WARN] Unable to load
>>>> native-hadoop library for your platform... using builtin-java classes
>>>> where applicable....
>>>> 
>>>> 
>>>> 2015-07-25 22:03:53 KafkaCheckpointManager [WARN] While trying to
>>>> validate topic __samza_checkpoint_ver_1_for_demo-parser7_1:
>>>> *kafka.common.LeaderNotAvailableException. Retrying.*
>>>> 2015-07-25 22:03:53 KafkaCheckpointManager [DEBUG] Exception detail:
>>>> kafka.common.LeaderNotAvailableException
>>>>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>>>   at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>>   at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>   at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>>>>   at java.lang.Class.newInstance(Class.java:442)
>>>>   at
>>> kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:84)
>>>>   at
>>>> org.apache.samza.util.KafkaUtil$.maybeThrowException(KafkaUtil.scala:63)
>>>>   at
>>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:389)
>>>>   at
>>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$validateTopic$2.apply(KafkaCheckpointManager.scala:386)
>>>>   at
>>> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>>>>   at
>>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.validateTopic(KafkaCheckpointManager.scala:385)
>>>>   at
>>> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.start(KafkaCheckpointManager.scala:336)
>>>>   at
>>> org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:127)
>>>>   at
>>> org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:55)
>>>>   at
>>> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:72)
>>>>   at
>>>> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:93)
>>>>   at
>>> org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
>>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Verifying properties
>>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property client.id is
>>>> overridden to samza_checkpoint_manager-demo_parser7-1-1437887032962-0
>>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property
>>>> metadata.broker.list is overridden to localhost:9092
>>>> 2015-07-25 22:03:53 VerifiableProperties [INFO] Property
>>>> request.timeout.ms is overridden to 30000
>>>> 2015-07-25 22:03:53 ClientUtils$ [INFO] Fetching metadata from broker
>>>> id:0,host:localhost,port:9092 with correlation id 1 for 1 topic(s)
>>>> Set(__samza_checkpoint_ver_1_for_demo-parser7_1)
>>>> 2015-07-25 22:03:53 BlockingChannel [DEBUG] Created socket with
>>>> SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 408300 (requested
>>>> -1), SO_SNDBUF = 114324 (requested 102400), connectTimeoutMs = 30000.
>>>> 2015-07-25 22:03:53 SyncProducer [INFO] Connected to localhost:9092
>>>> for producing
>>>> 2015-07-25 22:03:53 SyncProducer [INFO] Disconnecting from
>>> localhost:9092
>>>> 2015-07-25 22:03:53 ClientUtils$ [DEBUG] Successfully fetched metadata
>>>> for 1 topic(s) Set(__samza_checkpoint_ver_1_for_demo-parser7_1)
>>>> 2015-07-25 22:03:53 KafkaCheckpointManager [INFO] Successfully
>>>> validated checkpoint topic
>>>> __samza_checkpoint_ver_1_for_demo-parser7_1.
>>>> 
>>>> 
>>>> 
>>>> Exception at log
>>> deploy/yarn/logs/userlogs/application_1437886672214_0001/container_1437886672214_0001_01_000002/stderr
>>>> 
>>>> 2015-07-25 22:06:03 HttpDemoParserStreamTask [INFO] key=123:
>>>> message=timestamp=06-20-2015 id=123 ip=22.231.113.69 browser=Chrome
>>>> postalCode=95131 url=http://sample1.com language=ENG mobileBrand=Apple
>>>> *count=3860*
>>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [INFO] Reconnect due to
>>>> socket error: java.io.EOFException: Received -1 when reading from
>>>> channel, socket has likely been closed.
>>>> 2015-07-25 22:06:04 Selector [WARN]
>>>> *Error in I/O with localhost/127.0.0.1 <http://127.0.0.1
>>>>> java.io.EOFException*
>>>>   at
>>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>>>>   at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>>>>   at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>>>>   at java.lang.Thread.run(Thread.java:745)
>>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting
>>>> from selinas-mbp.attlocal.net:9092
>>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
>>>> Selinas-MBP.attlocal.net/192.168.1.227
>>>> java.io.EOFException
>>>>   at
>>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>>>>   at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>>>>   at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>>>>   at java.lang.Thread.run(Thread.java:745)
>>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting
>>>> from selinas-mbp.attlocal.net:9092
>>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
>>>> Selinas-MBP.attlocal.net/192.168.1.227
>>>> java.io.EOFException
>>>>   at
>>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>>>>   at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>>>>   at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>>>>   at java.lang.Thread.run(Thread.java:745)
>>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with localhost/
>>> 127.0.0.1
>>>> java.io.EOFException
>>>>   at
>>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>>>>   at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
>>>>   at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>>>>   at java.lang.Thread.run(Thread.java:745)
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected.
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected.
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node 0 disconnected.
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Node -1 disconnected.
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
>>>> request to node 0
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
>>>> request to node 0
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0
>>>> for sending metadata request in the next iteration
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Init connection to node 0
>>>> for sending metadata request in the next iteration
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to
>>>> node 0 at selinas-mbp.attlocal.net:9092.
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Initiating connection to
>>>> node 0 at selinas-mbp.attlocal.net:9092.
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
>>>> request to node 0
>>>> 2015-07-25 22:06:04 NetworkClient [DEBUG] Trying to send metadata
>>>> request to node 0
>>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O with
>>>> selinas-mbp.attlocal.net/192.168.1.227
>>>> java.net.ConnectException: Connection refused
>>>>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>   at
>>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>>>   at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
>>>>   at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>>>>   at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>>>>   at java.lang.Thread.run(Thread.java:745)
>>>> 2015-07-25 22:06:04 DefaultFetchSimpleConsumer [DEBUG] Disconnecting
>>>> from selinas-mbp.attlocal.net:9092
>>>> 2015-07-25 22:06:04 Selector [WARN] Error in I/O
>>>> 
>>>> 
>>>>> On Fri, Jul 24, 2015 at 5:03 PM, Benjamin Black <b...@b3k.us> wrote:
>>>>> 
>>>>> what are the log messages from the kafka brokers? these look like
>>> client
>>>>> messages indicating a broker problem.
>>>>> 
>>>>> On Fri, Jul 24, 2015 at 1:18 PM, Job-Selina Wu <swucaree...@gmail.com
>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Hi, Yi:
>>>>>> 
>>>>>>      I am wondering if the problem can be fixed by the parameter  "
>>>>>> max.message.size" at kafka.producer.ProducerConfig for the topic
>>> size?
>>>>>> 
>>>>>>      My Http Server send message to Kafka. The last message shown
>>> on
>>>>>> console is
>>>>>> "message=timestamp=06-20-2015 id=678 ip=22.231.113.68 browser=Safari
>>>>>> postalCode=95066 url=http://sample2.com language=ENG
>>> mobileBrand=Apple
>>>>>> count=4269"
>>>>>> 
>>>>>> However the Kafka got Exception from message 4244th
>>>>>> The error is below and Kafka do not accept any new message after
>>> this.
>>>>>> 
>>>>>> "[2015-07-24 12:46:11,078] WARN
>>> [console-consumer-61156_Selinas-MacBook-Pro.local-1437766693294-a68fc532-leader-finder-thread],
>>>>>> Failed to find leader for Set([http-demo,0])
>>>>>> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>> [Set(http-demo)] from broker
>>>>> [ArrayBuffer(id:0,host:10.1.10.173,port:9092)]
>>>>>> failed
>>>>>> at
>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>> at
>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>>>>>> at
>>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>>>>>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>> at
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>> at
>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>> ... 3 more
>>>>>> [2015-07-24 12:46:11,287] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>> 21 for topics [Set(http-demo)] from broker
>>>>>> [id:0,host:10.1.10.173,port:9092] failed (kafka.client.ClientUtils$)
>>>>>> java.nio.channels.ClosedChannelException
>>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>> at
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>> at
>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>> at
>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>>>>>> at
>>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>>>>>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)"
>>>>>> 
>>>>>> 
>>>>>> After the Error:
>>>>>> I show the topic, it is right, but can not show the content by
>>> command
>>>>> line
>>>>>> 
>>>>>> Selinas-MacBook-Pro:samza-Demo selina$
>>> deploy/kafka/bin/kafka-topics.sh
>>>>>> --list --zookeeper localhost:2181
>>>>>> http-demo
>>>>>> Selinas-MacBook-Pro:samza-Demo selina$
>>>>> deploy/kafka/bin/kafka-console-consumer.sh
>>>>>> --zookeeper localhost:2181 --from-beginning --topic http-demo
>>>>>> [2015-07-24 12:47:38,730] WARN
>>> [console-consumer-10297_Selinas-MacBook-Pro.local-1437767258570-1a809d87],
>>>>>> no brokers found when trying to rebalance.
>>>>>> (kafka.consumer.ZookeeperConsumerConnector)
>>>>>> 
>>>>>> Attached is my Kafka properties  for server and producer.
>>>>>> 
>>>>>> Your help is highly appreciated
>>>>>> 
>>>>>> 
>>>>>> Sincerely,
>>>>>> Selina
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Jul 23, 2015 at 11:16 PM, Yi Pan <nickpa...@gmail.com>
>>> wrote:
>>>>>> 
>>>>>>> Hi, Selina,
>>>>>>> 
>>>>>>> Your question is not clear.
>>>>>>> {quote}
>>>>>>> When the messages was send to Kafka by KafkaProducer, It always
>>> failed
>>>>>>> when the message more than 3000 - 4000 messages.
>>>>>>> {quote}
>>>>>>> 
>>>>>>> What's failing? The error stack shows errors on the consumer side
>>> and
>>>>> you
>>>>>>> were referring to failures to produce to Kafka. Could you be more
>>>>> specific
>>>>>>> regarding to what's your failure scenario?
>>>>>>> 
>>>>>>> -Yi
>>>>>>> 
>>>>>>> On Thu, Jul 23, 2015 at 5:46 PM, Job-Selina Wu <
>>> swucaree...@gmail.com
>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>>    When the messages was send to Kafka by KafkaProducer, It
>>> always
>>>>>>> failed
>>>>>>>> when the message more than 3000 - 4000 messages. The error is
>>> shown
>>>>>>> below.
>>>>>>>> I am wondering if any topic size I need to set at Samza
>>>> configuration?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> [2015-07-23 17:30:03,792] WARN
>>> [console-consumer-84579_Selinas-MacBook-Pro.local-1437697324624-eecb4f40-leader-finder-thread],
>>>>>>>> Failed to find leader for Set([http-demo,0])
>>>>>>>> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
>>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>>> [Set(http-demo)] from broker [ArrayBuffer()] failed
>>>>>>>>        at
>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>>        at
>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>>>>>>>>        at
>>> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>>>>>>>>        at
>>>>>>> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>>>>>>>> ^CConsumed 4327 messages
>>>>>>>> 
>>>>>>>> Your reply and comment will be highly appreciated.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Sincerely,
>>>>>>>> Selina
>> 
>> 

Reply via email to