Hi, Dear All: I have two Tasks at Samza. HttpDemoParserStreamTask and HttpDemoStatsStreamTask. They are almost same, except the output topic name is different and the task name are different at properties file. I am wondering how should I debug on it?
More details are list below. All your help is highly appreciated. Sincerely, Selina Currently HttpDemoParserStreamTask run well. However HttpDemoStatsStreamTask can generate the log correctly withouot Exception at deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/ samza-container-0.log The last record as below is right, however there is no topic " demo-stats-temp" was created. -------------------------------------- 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO] key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung Galaxy S6","operationSystem":"Android 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"} -------------------The demo-stats.properties files----------------------------- # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=demo-stats-tmp task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory task.checkpoint.system=kafka # Normally, this would be 3, but we have only one broker. task.checkpoint.replication.factor=1 # YARN yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz # Task task.class=samza.http.demo.task.HttpDemoParserStreamTask task.inputs=kafka.http-demo # Serializers serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.msg.serde=string systems.kafka.samza.key.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 #stream from begining #systems.kafka.consumer.auto.offset.reset=smallest #http-demo from the oldest systems.kafka.http-demo.samza.offset.default=oldest # all stream from the oldest systems.kafka.streams.http-demo.samza.offset.default=oldest systems.kafka.streams.http-demo.samza.reset.offset=true --------------------HttpDemoStatsStreamTask class---------------------------- public class HttpDemoStatsStreamTask implements StreamTask { //output topic private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "demo-stats-temp"); Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class); @SuppressWarnings("unchecked") @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { String key = (String) envelope.getKey(); String message = envelope.getMessage().toString(); logger.info("key=" + key + ": message=" + message); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); } } -----Tail of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 topic-------------- {"Partition 0":0} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} {"SystemStreamPartition [kafka, http-demo, 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}