Yeah, to make sure that I could get at the topic on the server I re-ran the producer and kafka-consumer to ensure I hadn't done something stupid - can produce and consume data at will. So I'm nuking the 'hello-samza' folder and restarting from scratch, just to ensure I haven't messed something up in the process.
I'll get back to the group when I have more info. -Ash On Mon, Mar 23, 2015 at 9:33 AM, Chinmay Soman <chinmay.cere...@gmail.com> wrote: > Hey Ash, > > Yeah - I've confirmed that System.out.println should result in a message > getting logged in the 'stdout' (at least while running in the local YARN > mode). > > Not sure why you're not seeing the same behaviour. I'm assuming you've > already confirmed that you're seeing data in your input Kafka topic ? You > might want to double check this by running the console consumer from > whatever node is processing your job. > > Other than that - we'll have to go back to debug mode to get more info. > > On Sun, Mar 22, 2015 at 7:21 PM, Ash W Matheson <ash.mathe...@gmail.com> > wrote: > > > looks like somewhere between Friday and today some security settings may > > have changed - telneting into the kafka/zookeeper boxes was broker from > the > > test samza host. So it *seems* to be running without any exceptions now. > > > > However, I'm still not seeing any output in any of the logs in > > deploy/yarn/logs/userlogs (and subdirectories). I was assuming > > System.out.println(...) would stuff results into either application. any > > thoughts on that? > > > > On Sun, Mar 22, 2015 at 1:53 PM, Chinmay Soman < > chinmay.cere...@gmail.com> > > wrote: > > > > > It says UnresolvedAddressException. > > > > > > Can you check that you can telnet to 2181 and 9092 on that remote host > ? > > > Also, you might try giving an IP address instead. > > > > > > On Sun, Mar 22, 2015 at 1:24 PM, Ash W Matheson < > ash.mathe...@gmail.com> > > > wrote: > > > > > > > OK, so the logs are much more detailed and useful, but it's not > making > > > any > > > > more sense: > > > > > > > > 2015-03-22 20:13:11 ClientUtils$ [INFO] Fetching metadata from broker > > > > id:0,host:redacted:9092 with correlation id 0 for 1 topic(s) > > Set(myTopic) > > > > 2015-03-22 20:13:11 SyncProducer [ERROR] Producer connection to > > > > redacted:9092 unsuccessful > > > > java.nio.channels.UnresolvedAddressException > > > > at sun.nio.ch.Net.checkAddress(Net.java:127) > > > > at > > > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644) > > > > at > > > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at > kafka.producer.SyncProducer.connect(SyncProducer.scala:141) > > > > at > > > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at > > > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88) > > > > at > > > > > > > > > > > > > > org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:40) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:197) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2$$anonfun$6.apply(KafkaSystemAdmin.scala:141) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:138) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamMetadata$2.apply(KafkaSystemAdmin.scala:137) > > > > at > > > > > > > > > > > > > > org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:136) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamMetadata(KafkaSystemAdmin.scala:125) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:58) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > > > at > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > > > > at > > > > > > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > > > > at > > > > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > > > > at > > > > > > > > > > > > > > org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:58) > > > > at > > > > org.apache.samza.util.Util$.getInputStreamPartitions(Util.scala:108) > > > > at > > > > > > org.apache.samza.util.Util$.assignContainerToSSPTaskNames(Util.scala:127) > > > > at > > > > > > > > > > > > > > org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:77) > > > > at > > > > > org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90) > > > > at > > > > org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala) > > > > 2015-03-22 20:13:11 ClientUtils$ [WARN] Fetching topic metadata with > > > > correlation id 0 for topics [Set(redacted)] from broker > > > > [id:0,host:redacted,port:9092] failed > > > > > > > > So, it's not connecting ... which has me confused because I'm pushing > > > data > > > > (from that python script) using the machine I'm running this test on. > > > I'll > > > > do some more digging and report back the results. > > > > > > > > On Sun, Mar 22, 2015 at 11:55 AM, Ash W Matheson < > > ash.mathe...@gmail.com > > > > > > > > wrote: > > > > > > > > > Ignore that last email - was reading the page stupidly. > > > > > > > > > > On Sun, Mar 22, 2015 at 11:52 AM, Ash W Matheson < > > > ash.mathe...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > >> Is there any easy way to do that without recompiling samza? I'm > > > trying > > > > >> to localize that into the 'hello-samza' and looking at > > > > >> > > http://samza.apache.org/learn/documentation/latest/jobs/logging.html > > > > >> leads me to believe that I have to do this in the base samza > project > > > > (not > > > > >> hello-samza). > > > > >> > > > > >> On Sun, Mar 22, 2015 at 11:37 AM, Ash W Matheson < > > > > ash.mathe...@gmail.com> > > > > >> wrote: > > > > >> > > > > >>> Sure - I'll do that in a bit and send it up to pastebin. > > > > >>> > > > > >>> On Sun, Mar 22, 2015 at 11:35 AM, Chinmay Soman < > > > > >>> chinmay.cere...@gmail.com> wrote: > > > > >>> > > > > >>>> Can you please enable debug level logging and paste the log? > > > > >>>> > > > > >>>> On Sun, Mar 22, 2015 at 11:28 AM, Ash W Matheson < > > > > >>>> ash.mathe...@gmail.com> > > > > >>>> wrote: > > > > >>>> > > > > >>>> > No, it's behind some corporate stuff - I just redacted it so I > > > could > > > > >>>> share > > > > >>>> > it up. > > > > >>>> > > > > > >>>> > On Sun, Mar 22, 2015 at 11:17 AM, Chinmay Soman < > > > > >>>> chinmay.cere...@gmail.com > > > > >>>> > > > > > > >>>> > wrote: > > > > >>>> > > > > > >>>> > > Just for sanity check, is the broker host 'redacted:9092' > or ' > > > > >>>> > > redactedec:9092'. > > > > >>>> > > > > > > >>>> > > Just wanted to rule out any typos. Are the 2 above hosts the > > > same > > > > ? > > > > >>>> > > > > > > >>>> > > On Sun, Mar 22, 2015 at 11:08 AM, Ash W Matheson < > > > > >>>> ash.mathe...@gmail.com > > > > >>>> > > > > > > >>>> > > wrote: > > > > >>>> > > > > > > >>>> > > > Also, here's the producer: http://pastebin.com/qMNJabTZ > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > On Sun, Mar 22, 2015 at 10:57 AM, Ash W Matheson < > > > > >>>> > ash.mathe...@gmail.com > > > > >>>> > > > > > > > >>>> > > > wrote: > > > > >>>> > > > > > > > >>>> > > > > Yep, first thing I checked (got bitten by that earlier > in > > > the > > > > >>>> week > > > > >>>> > with > > > > >>>> > > > no > > > > >>>> > > > > data actually in the topic). > > > > >>>> > > > > > > > > >>>> > > > > On Sun, Mar 22, 2015 at 10:56 AM, Chinmay Soman < > > > > >>>> > > > chinmay.cere...@gmail.com > > > > >>>> > > > > > wrote: > > > > >>>> > > > > > > > > >>>> > > > >> Can you double check that you can read data from your > > Kafka > > > > >>>> broker ? > > > > >>>> > > > >> > > > > >>>> > > > >> > ./deploy/kafka/bin/kafka-topics.sh --describe > > --zookeeper > > > > >>>> > > > localhost:2181 > > > > >>>> > > > >> --topic myTopic > > > > >>>> > > > >> > ./deploy/kafka/bin/kafka-console-consumer.sh > > --zookeeper > > > > >>>> > > > localhost:2181 > > > > >>>> > > > >> --topic myTopic --from-beginning > > > > >>>> > > > >> > > > > >>>> > > > >> I've seen cases where if the Kafka broker isn't > shutdown > > > > >>>> properly, > > > > >>>> > > > >> something like this happens. > > > > >>>> > > > >> > > > > >>>> > > > >> On Sun, Mar 22, 2015 at 10:35 AM, Ash W Matheson < > > > > >>>> > > > ash.mathe...@gmail.com> > > > > >>>> > > > >> wrote: > > > > >>>> > > > >> > > > > >>>> > > > >> > Hey all, > > > > >>>> > > > >> > > > > > >>>> > > > >> > Evaluating Samza currently and am running into some > odd > > > > >>>> issues. > > > > >>>> > > > >> > > > > > >>>> > > > >> > I'm currently working off the 'hello-samza' repo and > > > trying > > > > >>>> to > > > > >>>> > > parse a > > > > >>>> > > > >> > simple kafka topic that I've produced through an > > extenal > > > > >>>> java app > > > > >>>> > > > >> (nothing > > > > >>>> > > > >> > other than a series of sentences) and it's failing > > pretty > > > > >>>> hard for > > > > >>>> > > me. > > > > >>>> > > > >> The > > > > >>>> > > > >> > base 'hello-samza' set of apps works fine, but as > soon > > > as I > > > > >>>> change > > > > >>>> > > the > > > > >>>> > > > >> > configuration to look at a different Kafka/zookeeper > I > > > get > > > > >>>> the > > > > >>>> > > > >> following in > > > > >>>> > > > >> > the userlogs: > > > > >>>> > > > >> > > > > > >>>> > > > >> > 2015-03-22 17:07:09 KafkaSystemAdmin [WARN] Unable to > > > fetch > > > > >>>> last > > > > >>>> > > > offsets > > > > >>>> > > > >> > for streams [myTopic] due to > > kafka.common.KafkaException: > > > > >>>> fetching > > > > >>>> > > > topic > > > > >>>> > > > >> > metadata for topics [Set(myTopic)] from broker > > > > >>>> > > > >> > [ArrayBuffer(id:0,host:redacted,port:9092)] failed. > > > > Retrying. > > > > >>>> > > > >> > > > > > >>>> > > > >> > > > > > >>>> > > > >> > The modifications are pretty straightforward. In the > > > > >>>> > > > >> > Wikipedia-parser.properties, I've changed the > > following: > > > > >>>> > > > >> > task.inputs=kafka.myTopic > > > > >>>> > > > >> > > systems.kafka.consumer.zookeeper.connect=redacted:2181/ > > > > >>>> > > > >> > systems.kafka.consumer.auto.offset.reset=smallest > > > > >>>> > > > >> > > > systems.kafka.producer.metadata.broker.list=redacted:9092 > > > > >>>> > > > >> > > > > > >>>> > > > >> > and in the actual java file > > > WikipediaParserStreamTask.java > > > > >>>> > > > >> > public void process(IncomingMessageEnvelope > envelope, > > > > >>>> > > > MessageCollector > > > > >>>> > > > >> > collector, TaskCoordinator coordinator) { > > > > >>>> > > > >> > Map<String, Object> jsonObject = (Map<String, > > > Object>) > > > > >>>> > > > >> > envelope.getMessage(); > > > > >>>> > > > >> > WikipediaFeedEvent event = new > > > > >>>> WikipediaFeedEvent(jsonObject); > > > > >>>> > > > >> > > > > > >>>> > > > >> > try { > > > > >>>> > > > >> > System.out.println(event.getRawEvent()); > > > > >>>> > > > >> > > > > > >>>> > > > >> > And then following the compile/extract/run process > > > outlined > > > > >>>> in the > > > > >>>> > > > >> > hello-samza website. > > > > >>>> > > > >> > > > > > >>>> > > > >> > Any thoughts? I've looked online for any 'super > > simple' > > > > >>>> examples > > > > >>>> > of > > > > >>>> > > > >> > ingesting kafka in samza with very little success. > > > > >>>> > > > >> > > > > > >>>> > > > >> > > > > >>>> > > > >> > > > > >>>> > > > >> > > > > >>>> > > > >> -- > > > > >>>> > > > >> Thanks and regards > > > > >>>> > > > >> > > > > >>>> > > > >> Chinmay Soman > > > > >>>> > > > >> > > > > >>>> > > > > > > > > >>>> > > > > > > > > >>>> > > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > -- > > > > >>>> > > Thanks and regards > > > > >>>> > > > > > > >>>> > > Chinmay Soman > > > > >>>> > > > > > > >>>> > > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> -- > > > > >>>> Thanks and regards > > > > >>>> > > > > >>>> Chinmay Soman > > > > >>>> > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks and regards > > > > > > Chinmay Soman > > > > > > > > > -- > Thanks and regards > > Chinmay Soman >