Dear All:
    I tried to consumer kafka topic "cnr-proto" in Java. It got the
 SamzaContainer NullPointerException as below.
    The messages can be shown by command line correctly
"deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
--from-beginning --topic cnr-proto"

    My Key and message of topic "cir-proto" at Kafka are both in byte[]

   run
   deploy/samza/bin/run-job.sh
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties

    the properties file and java code are list below also.

   Your help is highly appreciated.

Sincerely,
Selina

-------------error in samza-container-0.log-------------

2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process loop.
java.lang.NullPointerException
at samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
at
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
multiplexer.
2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
10.1.10.141:9092
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
socket error: java.nio.channels.ClosedByInterruptException
2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
10.1.10.141:9092

---------mct-aggregation.properties----------
 # Job
 job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
 job.name=mct-aggregation

 
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
 task.checkpoint.system=kafka
 # Normally, this would be 3, but we have only one broker.
 task.checkpoint.replication.factor=1

 # YARN
 
yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz

 # Task
# path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
 task.class=samza.http.demo.task.MctAggregateTask
 task.inputs=kafka.cnr-proto

 # Serializers
 serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
 
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
 serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
 
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory


 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.samza.key.serde=byte
 systems.kafka.samza.msg.serde=byte

# Use the "byte" serializer for messages in the "cnr-proto" topic
systems.kafka.streams.cnr-proto.samza.key.serde=byte
systems.kafka.streams.cnr-proto.samza.msg.serde=byte

 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092

 #stream from begining
 #systems.kafka.consumer.auto.offset.reset=smallest
#http-demo from the oldest
 systems.kafka.cnr-proto.samza.offset.default=oldest
# all stream from the oldest
 systems.kafka.streams.cnr-proto.samza.offset.default=oldest
 systems.kafka.streams.cnr-proto.samza.reset.offset=true

-------------------MctAggregateTask.java----------

public class MctAggregateTask implements StreamTask {

  private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "cnr-proto-tmp");

  @SuppressWarnings("unchecked")
  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {

    byte[] key = (byte[])envelope.getKey();
    byte[] message = (byte[]) envelope.getMessage();

    logger.info("key="+key.toString()+": message="+message.toString());
    collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
  }

Reply via email to