Hi, Yan I like to correct my previous comment, when I comment out systems.kafka.streams.http-demo.samza.offset.default=oldest systems.kafka.streams.http-demo.samza.reset.offset=true
*the logger is not show at *at samza-container-0.log, but it make sense. Sincerely, Seina On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <swucaree...@gmail.com> wrote: > Hi, Yan: > > Thanks a lot for your reply. > I tried to comment out > systems.kafka.http-demo.samza.offset.default=oldest > and then I tried to comment out > systems.kafka.streams.http-demo.samza.offset.default=oldest > systems.kafka.streams.http-demo.samza.reset.offset=true > > The result is same as before. 1. the checkoutpoint topic was created, 2. > the log created by Logger can be found at /samza-container-0.log. 3. no > exception is at samza-container-0.log. > > I guess something conflict between HttpDemoParserStreamTask and > HttpDemoStatsStreamTask? Is any resource registered by > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not recreate > a topic? > > Sincerely, > Selina > > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <yanfang...@gmail.com> wrote: > >> Can you comment out "systems.kafka.http-demo.samza.offset.default=oldest" >> to see how it works? This seems not a correct property. >> >> Thanks, >> >> Fang, Yan >> yanfang...@gmail.com >> >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com> >> wrote: >> >> > Hi, Dear All: >> > >> > I have two Tasks at Samza. HttpDemoParserStreamTask and >> > HttpDemoStatsStreamTask. They are almost same, except the output topic >> name >> > is different and the task name are different at properties file. I am >> > wondering how should I debug on it? >> > >> > More details are list below. >> > >> > All your help is highly appreciated. >> > >> > Sincerely, >> > Selina >> > >> > Currently HttpDemoParserStreamTask run well. >> > However HttpDemoStatsStreamTask can generate the log correctly >> withouot >> > Exception at >> > >> > >> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/ >> > samza-container-0.log >> > >> > The last record as below is right, however there is no topic " >> > demo-stats-temp" was created. >> > -------------------------------------- >> > >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO] >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27 >> > >> > >> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":" >> > >> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi >> > >> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung >> > Galaxy S6","operationSystem":"Android >> > >> > >> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"} >> > >> > >> > -------------------The demo-stats.properties >> > files----------------------------- >> > >> > # Job >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory >> > job.name=demo-stats-tmp >> >> > >> > >> > >> > >> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory >> > task.checkpoint.system=kafka >> > # Normally, this would be 3, but we have only one broker. >> > task.checkpoint.replication.factor=1 >> > >> > # YARN >> > >> > >> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz >> > >> > # Task >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask >> > task.inputs=kafka.http-demo >> > >> > # Serializers >> > >> > >> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory >> > >> > # Kafka System >> > >> > >> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory >> > systems.kafka.samza.msg.serde=string >> > >> > systems.kafka.samza.key.serde=string >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ >> > systems.kafka.producer.bootstrap.servers=localhost:9092 >> > >> > #stream from begining >> > #systems.kafka.consumer.auto.offset.reset=smallest >> > #http-demo from the oldest >> > systems.kafka.http-demo.samza.offset.default=oldest >> > # all stream from the oldest >> > systems.kafka.streams.http-demo.samza.offset.default=oldest >> > systems.kafka.streams.http-demo.samza.reset.offset=true >> > >> > >> > >> > --------------------HttpDemoStatsStreamTask >> > class---------------------------- >> > >> > public class HttpDemoStatsStreamTask implements StreamTask { >> > >> > //output topic >> > private static final SystemStream OUTPUT_STREAM = new >> > SystemStream("kafka", "demo-stats-temp"); >> > Logger logger = >> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class); >> > >> > @SuppressWarnings("unchecked") >> > @Override >> > public void process(IncomingMessageEnvelope envelope, >> > MessageCollector collector, TaskCoordinator coordinator) throws >> > Exception { >> > >> > >> > String key = (String) envelope.getKey(); >> > String message = envelope.getMessage().toString(); >> > logger.info("key=" + key + ": message=" + message); >> >> > >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, >> > message)); >> > } >> > } >> > >> > -----Tail of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 >> > topic-------------- >> > >> > {"Partition 0":0} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > {"SystemStreamPartition [kafka, http-demo, >> > >> > >> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} >> > >> > >