Hi, Yi: Thanks for your suggestion. I found the bug. the line of logger.info got Exception.
Thanks a lot! Sincerely, Selina On Thu, Oct 15, 2015 at 10:03 AM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Selina, > > Your stack trace showed that the exception was thrown at line 50 in your > task code. Could you point out which line is it? > > It would be helpful if you can add some log info regarding to the message > you receive in the process() vs the message you read from Kafka console > consumer. > > Thanks! > > -Yi > > On Wed, Oct 14, 2015 at 10:07 PM, Selina Tech <swucaree...@gmail.com> > wrote: > > > Dear All: > > I tried to consumer kafka topic "cnr-proto" in Java. It got the > > SamzaContainer NullPointerException as below. > > The messages can be shown by command line correctly > > "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 > > --from-beginning --topic cnr-proto" > > > > My Key and message of topic "cir-proto" at Kafka are both in byte[] > > > > run > > deploy/samza/bin/run-job.sh > > > --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory > > --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties > > > > the properties file and java code are list below also. > > > > Your help is highly appreciated. > > > > Sincerely, > > Selina > > > > -------------error in samza-container-0.log------------- > > > > 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process > > loop. > > java.lang.NullPointerException > > at > samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50) > > at > > > > > org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133) > > at > > > > > org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) > > at > org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132) > > at > > > > > org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112) > > at > org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) > > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) > > at org.apache.samza.container.RunLoop.process(RunLoop.scala:98) > > at org.apache.samza.container.RunLoop.run(RunLoop.scala:69) > > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555) > > at > > > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93) > > at > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67) > > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down. > > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer > > multiplexer. > > 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers. > > 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for > > 10.1.10.141:9092 > > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to > > socket error: java.nio.channels.ClosedByInterruptException > > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from > > 10.1.10.141:9092 > > > > ---------mct-aggregation.properties---------- > > # Job > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > job.name=mct-aggregation > > > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > task.checkpoint.system=kafka > > # Normally, this would be 3, but we have only one broker. > > task.checkpoint.replication.factor=1 > > > > # YARN > > > > > yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz > > > > # Task > > # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java > > task.class=samza.http.demo.task.MctAggregateTask > > task.inputs=kafka.cnr-proto > > > > # Serializers > > > > > serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > > > > serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory > > > > > serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory > > > > > > # Kafka System > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > systems.kafka.samza.key.serde=byte > > systems.kafka.samza.msg.serde=byte > > > > # Use the "byte" serializer for messages in the "cnr-proto" topic > > systems.kafka.streams.cnr-proto.samza.key.serde=byte > > systems.kafka.streams.cnr-proto.samza.msg.serde=byte > > > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > > systems.kafka.producer.bootstrap.servers=localhost:9092 > > > > #stream from begining > > #systems.kafka.consumer.auto.offset.reset=smallest > > #http-demo from the oldest > > systems.kafka.cnr-proto.samza.offset.default=oldest > > # all stream from the oldest > > systems.kafka.streams.cnr-proto.samza.offset.default=oldest > > systems.kafka.streams.cnr-proto.samza.reset.offset=true > > > > -------------------MctAggregateTask.java---------- > > > > public class MctAggregateTask implements StreamTask { > > > > private static final SystemStream OUTPUT_STREAM = new > > SystemStream("kafka", "cnr-proto-tmp"); > > > > @SuppressWarnings("unchecked") > > @Override > > public void process(IncomingMessageEnvelope envelope, MessageCollector > > collector, TaskCoordinator coordinator) throws Exception { > > > > byte[] key = (byte[])envelope.getKey(); > > byte[] message = (byte[]) envelope.getMessage(); > > > > logger.info("key="+key.toString()+": message="+message.toString()); > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); > > } > > >