Dear All: I tried to consumer kafka topic "cnr-proto" in Java. It got the SamzaContainer NullPointerException as below. The messages can be shown by command line correctly "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic cnr-proto"
My Key and message of topic "cir-proto" at Kafka are both in byte[] run deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties the properties file and java code are list below also. Your help is highly appreciated. Sincerely, Selina -------------error in samza-container-0.log------------- 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process loop. java.lang.NullPointerException at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50) at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132) at org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36) at org.apache.samza.container.RunLoop.process(RunLoop.scala:98) at org.apache.samza.container.RunLoop.run(RunLoop.scala:69) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down. 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer multiplexer. 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers. 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for 10.1.10.141:9092 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedByInterruptException 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from 10.1.10.141:9092 ---------mct-aggregation.properties---------- # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=mct-aggregation task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory task.checkpoint.system=kafka # Normally, this would be 3, but we have only one broker. task.checkpoint.replication.factor=1 # YARN yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz # Task # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java task.class=samza.http.demo.task.MctAggregateTask task.inputs=kafka.cnr-proto # Serializers serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.key.serde=byte systems.kafka.samza.msg.serde=byte # Use the "byte" serializer for messages in the "cnr-proto" topic systems.kafka.streams.cnr-proto.samza.key.serde=byte systems.kafka.streams.cnr-proto.samza.msg.serde=byte systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 #stream from begining #systems.kafka.consumer.auto.offset.reset=smallest #http-demo from the oldest systems.kafka.cnr-proto.samza.offset.default=oldest # all stream from the oldest systems.kafka.streams.cnr-proto.samza.offset.default=oldest systems.kafka.streams.cnr-proto.samza.reset.offset=true -------------------MctAggregateTask.java---------- public class MctAggregateTask implements StreamTask { private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "cnr-proto-tmp"); @SuppressWarnings("unchecked") @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { byte[] key = (byte[])envelope.getKey(); byte[] message = (byte[]) envelope.getMessage(); logger.info("key="+key.toString()+": message="+message.toString()); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); }