Hi, Yan: Thanks a lot for your reply. I tried to comment out systems.kafka.http-demo.samza.offset.default=oldest and then I tried to comment out systems.kafka.streams.http-demo.samza.offset.default=oldest systems.kafka.streams.http-demo.samza.reset.offset=true
The result is same as before. 1. the checkoutpoint topic was created, 2. the log created by Logger can be found at /samza-container-0.log. 3. no exception is at samza-container-0.log. I guess something conflict between HttpDemoParserStreamTask and HttpDemoStatsStreamTask? Is any resource registered by HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not recreate a topic? Sincerely, Selina On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <yanfang...@gmail.com> wrote: > Can you comment out "systems.kafka.http-demo.samza.offset.default=oldest" > to see how it works? This seems not a correct property. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > > > Hi, Dear All: > > > > I have two Tasks at Samza. HttpDemoParserStreamTask and > > HttpDemoStatsStreamTask. They are almost same, except the output topic > name > > is different and the task name are different at properties file. I am > > wondering how should I debug on it? > > > > More details are list below. > > > > All your help is highly appreciated. > > > > Sincerely, > > Selina > > > > Currently HttpDemoParserStreamTask run well. > > However HttpDemoStatsStreamTask can generate the log correctly > withouot > > Exception at > > > > > deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/ > > samza-container-0.log > > > > The last record as below is right, however there is no topic " > > demo-stats-temp" was created. > > -------------------------------------- > > > > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO] > > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27 > > > > > 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":" > > > http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi > > > ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung > > Galaxy S6","operationSystem":"Android > > > > > 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"} > > > > > > -------------------The demo-stats.properties > > files----------------------------- > > > > # Job > > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > > job.name=demo-stats-tmp > > > > > > > > > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > > task.checkpoint.system=kafka > > # Normally, this would be 3, but we have only one broker. > > task.checkpoint.replication.factor=1 > > > > # YARN > > > > > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > > > > # Task > > task.class=samza.http.demo.task.HttpDemoParserStreamTask > > task.inputs=kafka.http-demo > > > > # Serializers > > > > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > > > # Kafka System > > > > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > > systems.kafka.samza.msg.serde=string > > > > systems.kafka.samza.key.serde=string > > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > > systems.kafka.producer.bootstrap.servers=localhost:9092 > > > > #stream from begining > > #systems.kafka.consumer.auto.offset.reset=smallest > > #http-demo from the oldest > > systems.kafka.http-demo.samza.offset.default=oldest > > # all stream from the oldest > > systems.kafka.streams.http-demo.samza.offset.default=oldest > > systems.kafka.streams.http-demo.samza.reset.offset=true > > > > > > > > --------------------HttpDemoStatsStreamTask > > class---------------------------- > > > > public class HttpDemoStatsStreamTask implements StreamTask { > > > > //output topic > > private static final SystemStream OUTPUT_STREAM = new > > SystemStream("kafka", "demo-stats-temp"); > > Logger logger = > LoggerFactory.getLogger(HttpDemoStatsStreamTask.class); > > > > @SuppressWarnings("unchecked") > > @Override > > public void process(IncomingMessageEnvelope envelope, > > MessageCollector collector, TaskCoordinator coordinator) throws > > Exception { > > > > > > String key = (String) envelope.getKey(); > > String message = envelope.getMessage().toString(); > > logger.info("key=" + key + ": message=" + message); > > > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > > message)); > > } > > } > > > > -----Tail of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 > > topic-------------- > > > > {"Partition 0":0} > > {"SystemStreamPartition [kafka, http-demo, > > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > {"SystemStreamPartition [kafka, http-demo, > > > > > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > > >