thanks for the help.  For others who happen upon this thread, the problem
was indeed on the consumer side. Spark (0.9.1) needs a bit of help setting
the Kafka properties for big messages.

    // setup Kafka with manual parameters to allow big messaging
    //see
spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> group,
      "zookeeper.connection.timeout.ms" -> "10000",
      "fetch.message.max.bytes" -> "10485760",    // 10MB
      "fetch.size" -> "10485760")    // not needed?
    val lines = kafka.KafkaUtils.createStream[String, String,
StringDecoder, StringDecoder](
                ssc, kafkaParams, topicpMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

 sorry about all the messages on this topic for those of you who aren't
getting digests


On Fri, Jun 27, 2014 at 10:43 AM, Louis Clark <sfgypsy...@gmail.com> wrote:

> I believe so.  I have set
>
> fetch.message.max.bytes=10485760
>
> in both the consumer.properties and the server.properties config files,
> then restarted kafka -> same problem.  I'm following up on some of
> Guozhang's other suggestions now.
>
> One thing I'm confused about (I should read the docs again) is what aspect
> of Kafka reads consumer.properties.  If I'm using a different program
> (Spark streaming) as consumer, do any Kafka programs/services even read
> consumer.properties?
>
> thanks
>
>
> On Fri, Jun 27, 2014 at 10:31 AM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
>> but I found one message (5.1MB in size) which
>> is clogging my pipeline up
>>
>> Have you ensured that the "fetch.message.max.bytes" on the consumer
>> config is set to > 5.1 MB?
>>
>>
>> On Thu, Jun 26, 2014 at 6:14 PM, Louis Clark <sfgypsy...@gmail.com>
>> wrote:
>>
>>> in the consumer.properties file, I've got (default?):
>>>
>>> zookeeper.connect=127.0.0.1:2181
>>>
>>> zookeeper.connection.timeout.ms=1000000
>>>
>>> group.id=test-consumer-group
>>>
>>> thanks,
>>>
>>> -Louis
>>>
>>>
>>> On Thu, Jun 26, 2014 at 6:04 PM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>> > Hi Louis,
>>> >
>>> > What are your consumer's config properties?
>>> >
>>> > Guozhang
>>> >
>>> >
>>> > On Thu, Jun 26, 2014 at 5:54 PM, Louis Clark <sfgypsy...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi, I'm trying to stream large message with Kafka into Spark.
>>>  Generally
>>> >> this has been working nicely, but I found one message (5.1MB in size)
>>> >> which
>>> >> is clogging my pipeline up.  I have these settings in
>>> server.properties:
>>> >> fetch.message.max.bytes=10485760
>>> >> replica.fetch.max.bytes=10485760
>>> >> message.max.bytes=10485760
>>> >> fetch.size=10485760
>>> >>
>>> >> I'm not getting any obvious errors in the logs and I can retrieve the
>>> >> large
>>> >> message with this command:
>>> >> kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning
>>> >> --topic mytopic --fetch-size=10485760
>>> >>
>>> >> I noticed recently after digging into this problem that the
>>> >> kafkaServer.out
>>> >> log is complaining that the fetch.message.max.bytes parameter is not
>>> >> valid:
>>> >> [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not
>>> >> valid (kafka.utils.VerifiableProperties)
>>> >> [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid
>>> >> (kafka.utils.VerifiableProperties)
>>> >> That seems like the most critical parameter for my needs.  It is
>>> >> apparently
>>> >> not recognizing that it is a parameter despite it being listed on the
>>> >> configuration website (https://kafka.apache.org/08/configuration.html
>>> ).
>>> >>  I'm using 0.8.1.1.  Any ideas?
>>> >>
>>> >> many thanks for reading this!
>>> >>
>>> >
>>> >
>>> >
>>> > --
>>> > -- Guozhang
>>> >
>>>
>>
>>
>

Reply via email to