task.class=samza.http.demo.task.HttpDemoParserStreamTask ... you are not using the StateStream class...
Fang, Yan yanfang...@gmail.com On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com> wrote: > Hi, Yan > > I like to correct my previous comment, when I comment out > systems.kafka.streams.http-demo.samza.offset.default=oldest > systems.kafka.streams.http-demo.samza.reset.offset=true > > *the logger is not show at *at samza-container-0.log, but it make sense. > > > Sincerely, > Seina > > On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > > > Hi, Yan: > > > > Thanks a lot for your reply. > > I tried to comment out > systems.kafka.http-demo.samza.offset.default=oldest > > and then I tried to comment out > > systems.kafka.streams.http-demo.samza.offset.default=oldest > > systems.kafka.streams.http-demo.samza.reset.offset=true > > > > The result is same as before. 1. the checkoutpoint topic was created, > 2. > > the log created by Logger can be found at /samza-container-0.log. 3. no > > exception is at samza-container-0.log. > > > > I guess something conflict between HttpDemoParserStreamTask and > > HttpDemoStatsStreamTask? Is any resource registered by > > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not > recreate > > a topic? > > > > Sincerely, > > Selina > > > > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <yanfang...@gmail.com> wrote: > > > >> Can you comment out > "systems.kafka.http-demo.samza.offset.default=oldest" > >> to see how it works? This seems not a correct property. > >> > >> Thanks, > >> > >> Fang, Yan > >> yanfang...@gmail.com > >> > >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com> > >> wrote: > >> > >> > Hi, Dear All: > >> > > >> > I have two Tasks at Samza. HttpDemoParserStreamTask and > >> > HttpDemoStatsStreamTask. They are almost same, except the output topic > >> name > >> > is different and the task name are different at properties file. I am > >> > wondering how should I debug on it? > >> > > >> > More details are list below. > >> > > >> > All your help is highly appreciated. > >> > > >> > Sincerely, > >> > Selina > >> > > >> > Currently HttpDemoParserStreamTask run well. > >> > However HttpDemoStatsStreamTask can generate the log correctly > >> withouot > >> > Exception at > >> > > >> > > >> > deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/ > >> > samza-container-0.log > >> > > >> > The last record as below is right, however there is no topic " > >> > demo-stats-temp" was created. > >> > -------------------------------------- > >> > > >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO] > >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27 > >> > > >> > > >> > 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":" > >> > > >> > http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi > >> > > >> > ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung > >> > Galaxy S6","operationSystem":"Android > >> > > >> > > >> > 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"} > >> > > >> > > >> > -------------------The demo-stats.properties > >> > files----------------------------- > >> > > >> > # Job > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >> > job.name=demo-stats-tmp > >> > >> > > >> > > >> > > >> > > >> > task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory > >> > task.checkpoint.system=kafka > >> > # Normally, this would be 3, but we have only one broker. > >> > task.checkpoint.replication.factor=1 > >> > > >> > # YARN > >> > > >> > > >> > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > >> > > >> > # Task > >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask > >> > task.inputs=kafka.http-demo > >> > > >> > # Serializers > >> > > >> > > >> > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > >> > > >> > # Kafka System > >> > > >> > > >> > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > >> > systems.kafka.samza.msg.serde=string > >> > > >> > systems.kafka.samza.key.serde=string > >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > >> > systems.kafka.producer.bootstrap.servers=localhost:9092 > >> > > >> > #stream from begining > >> > #systems.kafka.consumer.auto.offset.reset=smallest > >> > #http-demo from the oldest > >> > systems.kafka.http-demo.samza.offset.default=oldest > >> > # all stream from the oldest > >> > systems.kafka.streams.http-demo.samza.offset.default=oldest > >> > systems.kafka.streams.http-demo.samza.reset.offset=true > >> > > >> > > >> > > >> > --------------------HttpDemoStatsStreamTask > >> > class---------------------------- > >> > > >> > public class HttpDemoStatsStreamTask implements StreamTask { > >> > > >> > //output topic > >> > private static final SystemStream OUTPUT_STREAM = new > >> > SystemStream("kafka", "demo-stats-temp"); > >> > Logger logger = > >> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class); > >> > > >> > @SuppressWarnings("unchecked") > >> > @Override > >> > public void process(IncomingMessageEnvelope envelope, > >> > MessageCollector collector, TaskCoordinator coordinator) throws > >> > Exception { > >> > > >> > > >> > String key = (String) envelope.getKey(); > >> > String message = envelope.getMessage().toString(); > >> > logger.info("key=" + key + ": message=" + message); > >> > >> > > >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > >> > message)); > >> > } > >> > } > >> > > >> > -----Tail of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 > >> > topic-------------- > >> > > >> > {"Partition 0":0} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > {"SystemStreamPartition [kafka, http-demo, > >> > > >> > > >> > 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}} > >> > > >> > > > > >