thanks for the help. For others who happen upon this thread, the problem was indeed on the consumer side. Spark (0.9.1) needs a bit of help setting the Kafka properties for big messages.
// setup Kafka with manual parameters to allow big messaging //see spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> group, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760", // 10MB "fetch.size" -> "10485760") // not needed? val lines = kafka.KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2) sorry about all the messages on this topic for those of you who aren't getting digests On Fri, Jun 27, 2014 at 10:43 AM, Louis Clark <sfgypsy...@gmail.com> wrote: > I believe so. I have set > > fetch.message.max.bytes=10485760 > > in both the consumer.properties and the server.properties config files, > then restarted kafka -> same problem. I'm following up on some of > Guozhang's other suggestions now. > > One thing I'm confused about (I should read the docs again) is what aspect > of Kafka reads consumer.properties. If I'm using a different program > (Spark streaming) as consumer, do any Kafka programs/services even read > consumer.properties? > > thanks > > > On Fri, Jun 27, 2014 at 10:31 AM, Neha Narkhede <neha.narkh...@gmail.com> > wrote: > >> but I found one message (5.1MB in size) which >> is clogging my pipeline up >> >> Have you ensured that the "fetch.message.max.bytes" on the consumer >> config is set to > 5.1 MB? >> >> >> On Thu, Jun 26, 2014 at 6:14 PM, Louis Clark <sfgypsy...@gmail.com> >> wrote: >> >>> in the consumer.properties file, I've got (default?): >>> >>> zookeeper.connect=127.0.0.1:2181 >>> >>> zookeeper.connection.timeout.ms=1000000 >>> >>> group.id=test-consumer-group >>> >>> thanks, >>> >>> -Louis >>> >>> >>> On Thu, Jun 26, 2014 at 6:04 PM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>> > Hi Louis, >>> > >>> > What are your consumer's config properties? >>> > >>> > Guozhang >>> > >>> > >>> > On Thu, Jun 26, 2014 at 5:54 PM, Louis Clark <sfgypsy...@gmail.com> >>> wrote: >>> > >>> >> Hi, I'm trying to stream large message with Kafka into Spark. >>> Generally >>> >> this has been working nicely, but I found one message (5.1MB in size) >>> >> which >>> >> is clogging my pipeline up. I have these settings in >>> server.properties: >>> >> fetch.message.max.bytes=10485760 >>> >> replica.fetch.max.bytes=10485760 >>> >> message.max.bytes=10485760 >>> >> fetch.size=10485760 >>> >> >>> >> I'm not getting any obvious errors in the logs and I can retrieve the >>> >> large >>> >> message with this command: >>> >> kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning >>> >> --topic mytopic --fetch-size=10485760 >>> >> >>> >> I noticed recently after digging into this problem that the >>> >> kafkaServer.out >>> >> log is complaining that the fetch.message.max.bytes parameter is not >>> >> valid: >>> >> [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not >>> >> valid (kafka.utils.VerifiableProperties) >>> >> [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid >>> >> (kafka.utils.VerifiableProperties) >>> >> That seems like the most critical parameter for my needs. It is >>> >> apparently >>> >> not recognizing that it is a parameter despite it being listed on the >>> >> configuration website (https://kafka.apache.org/08/configuration.html >>> ). >>> >> I'm using 0.8.1.1. Any ideas? >>> >> >>> >> many thanks for reading this! >>> >> >>> > >>> > >>> > >>> > -- >>> > -- Guozhang >>> > >>> >> >> >