Hi, Yi:

       Thanks for your suggestion. I found the bug. the line of logger.info
got Exception.

Thanks a lot!


Sincerely,
Selina



On Thu, Oct 15, 2015 at 10:03 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Selina,
>
> Your stack trace showed that the exception was thrown at line 50 in your
> task code. Could you point out which line is it?
>
> It would be helpful if you can add some log info regarding to the message
> you receive in the process() vs the message you read from Kafka console
> consumer.
>
> Thanks!
>
> -Yi
>
> On Wed, Oct 14, 2015 at 10:07 PM, Selina Tech <swucaree...@gmail.com>
> wrote:
>
> > Dear All:
> >     I tried to consumer kafka topic "cnr-proto" in Java. It got the
> >  SamzaContainer NullPointerException as below.
> >     The messages can be shown by command line correctly
> > "deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> > --from-beginning --topic cnr-proto"
> >
> >     My Key and message of topic "cir-proto" at Kafka are both in byte[]
> >
> >    run
> >    deploy/samza/bin/run-job.sh
> >
> --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
> > --config-path=file://$PWD/deploy/samza/config/mct-aggregation.properties
> >
> >     the properties file and java code are list below also.
> >
> >    Your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> > -------------error in samza-container-0.log-------------
> >
> > 2015-10-14 21:48:22 SamzaContainer [ERROR] Caught exception in process
> > loop.
> > java.lang.NullPointerException
> > at
> samza.http.demo.task.MctAggregateTask.process(MctAggregateTask.java:50)
> > at
> >
> >
> org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:133)
> > at
> >
> >
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> > at
> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:132)
> > at
> >
> >
> org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:112)
> > at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
> > at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
> > at org.apache.samza.container.RunLoop.process(RunLoop.scala:98)
> > at org.apache.samza.container.RunLoop.run(RunLoop.scala:69)
> > at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:555)
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93)
> > at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67)
> > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down.
> > 2015-10-14 21:48:22 SamzaContainer [INFO] Shutting down consumer
> > multiplexer.
> > 2015-10-14 21:48:22 SystemConsumers [DEBUG] Stopping consumers.
> > 2015-10-14 21:48:22 BrokerProxy [INFO] Shutting down BrokerProxy for
> > 10.1.10.141:9092
> > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [INFO] Reconnect due to
> > socket error: java.nio.channels.ClosedByInterruptException
> > 2015-10-14 21:48:22 DefaultFetchSimpleConsumer [DEBUG] Disconnecting from
> > 10.1.10.141:9092
> >
> > ---------mct-aggregation.properties----------
> >  # Job
> >  job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >  job.name=mct-aggregation
> >
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >  task.checkpoint.system=kafka
> >  # Normally, this would be 3, but we have only one broker.
> >  task.checkpoint.replication.factor=1
> >
> >  # YARN
> >
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/cnr-mct-aggregation-samza/target/hello-samza-0.9.1-dist.tar.gz
> >
> >  # Task
> > # path ./src/main/java/samza/http/demo/task/MctAggregateTask.java
> >  task.class=samza.http.demo.task.MctAggregateTask
> >  task.inputs=kafka.cnr-proto
> >
> >  # Serializers
> >
> >
> serializers.registry.byte.class=org.apache.samza.serializers.ByteSerdeFactory
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> >
> serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
> >
> >
> serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory
> >
> >
> >  # Kafka System
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >  systems.kafka.samza.key.serde=byte
> >  systems.kafka.samza.msg.serde=byte
> >
> > # Use the "byte" serializer for messages in the "cnr-proto" topic
> > systems.kafka.streams.cnr-proto.samza.key.serde=byte
> > systems.kafka.streams.cnr-proto.samza.msg.serde=byte
> >
> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> >
> >  #stream from begining
> >  #systems.kafka.consumer.auto.offset.reset=smallest
> > #http-demo from the oldest
> >  systems.kafka.cnr-proto.samza.offset.default=oldest
> > # all stream from the oldest
> >  systems.kafka.streams.cnr-proto.samza.offset.default=oldest
> >  systems.kafka.streams.cnr-proto.samza.reset.offset=true
> >
> > -------------------MctAggregateTask.java----------
> >
> > public class MctAggregateTask implements StreamTask {
> >
> >   private static final SystemStream OUTPUT_STREAM = new
> > SystemStream("kafka", "cnr-proto-tmp");
> >
> >   @SuppressWarnings("unchecked")
> >   @Override
> >   public void process(IncomingMessageEnvelope envelope, MessageCollector
> > collector, TaskCoordinator coordinator) throws Exception {
> >
> >     byte[] key = (byte[])envelope.getKey();
> >     byte[] message = (byte[]) envelope.getMessage();
> >
> >     logger.info("key="+key.toString()+": message="+message.toString());
> >     collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
> >   }
> >
>

Reply via email to