Can you comment out "systems.kafka.http-demo.samza.offset.default=oldest" to see how it works? This seems not a correct property.
Thanks, Fang, Yan yanfang...@gmail.com On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com> wrote: > Hi, Dear All: > > I have two Tasks at Samza. HttpDemoParserStreamTask and > HttpDemoStatsStreamTask. They are almost same, except the output topic name > is different and the task name are different at properties file. I am > wondering how should I debug on it? > > More details are list below. > > All your help is highly appreciated. > > Sincerely, > Selina > > Currently HttpDemoParserStreamTask run well. > However HttpDemoStatsStreamTask can generate the log correctly withouot > Exception at > > deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/ > samza-container-0.log > > The last record as below is right, however there is no topic " > demo-stats-temp" was created. > -------------------------------------- > > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO] > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27 > > 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":" > http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi > ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung > Galaxy S6","operationSystem":"Android > > 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"} > > > -------------------The demo-stats.properties > files----------------------------- > > # Job > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.name=demo-stats-tmp > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > task.checkpoint.system=kafka > # Normally, this would be 3, but we have only one broker. > task.checkpoint.replication.factor=1 > > # YARN > > > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > > # Task > task.class=samza.http.demo.task.HttpDemoParserStreamTask > task.inputs=kafka.http-demo > > # Serializers > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # Kafka System > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.msg.serde=string > > systems.kafka.samza.key.serde=string > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > systems.kafka.producer.bootstrap.servers=localhost:9092 > > #stream from begining > #systems.kafka.consumer.auto.offset.reset=smallest > #http-demo from the oldest > systems.kafka.http-demo.samza.offset.default=oldest > # all stream from the oldest > systems.kafka.streams.http-demo.samza.offset.default=oldest > systems.kafka.streams.http-demo.samza.reset.offset=true > > > > --------------------HttpDemoStatsStreamTask > class---------------------------- > > public class HttpDemoStatsStreamTask implements StreamTask { > > //output topic > private static final SystemStream OUTPUT_STREAM = new > SystemStream("kafka", "demo-stats-temp"); > Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class); > > @SuppressWarnings("unchecked") > @Override > public void process(IncomingMessageEnvelope envelope, > MessageCollector collector, TaskCoordinator coordinator) throws > Exception { > > > String key = (String) envelope.getKey(); > String message = envelope.getMessage().toString(); > logger.info("key=" + key + ": message=" + message); > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > message)); > } > } > > -----Tail of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 > topic-------------- > > {"Partition 0":0} > {"SystemStreamPartition [kafka, http-demo, > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >