task.class=samza.http.demo.task.HttpDemoParserStreamTask ...

you are not using the StateStream class...

Fang, Yan
yanfang...@gmail.com

On Tue, Jul 28, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com>
wrote:

> Hi, Yan
>
> I like to correct my previous comment, when I comment out
> systems.kafka.streams.http-demo.samza.offset.default=oldest
> systems.kafka.streams.http-demo.samza.reset.offset=true
>
> *the logger is not show at *at samza-container-0.log, but it make sense.
>
>
> Sincerely,
> Seina
>
> On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <swucaree...@gmail.com>
> wrote:
>
> > Hi, Yan:
> >
> >       Thanks a lot for your reply.
> >      I tried to comment out
> systems.kafka.http-demo.samza.offset.default=oldest
> > and then I tried to comment out
> > systems.kafka.streams.http-demo.samza.offset.default=oldest
> > systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> >  The result is same as before.  1. the checkoutpoint topic was created,
> 2.
> > the log created by Logger can be found at /samza-container-0.log. 3. no
> > exception is at samza-container-0.log.
> >
> >    I guess something conflict between HttpDemoParserStreamTask and
> > HttpDemoStatsStreamTask? Is any resource registered by
> > HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not
> recreate
> > a topic?
> >
> > Sincerely,
> > Selina
> >
> > On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <yanfang...@gmail.com> wrote:
> >
> >> Can you comment out
> "systems.kafka.http-demo.samza.offset.default=oldest"
> >> to see how it works? This seems not a correct property.
> >>
> >> Thanks,
> >>
> >> Fang, Yan
> >> yanfang...@gmail.com
> >>
> >> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com>
> >> wrote:
> >>
> >> > Hi, Dear All:
> >> >
> >> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
> >> > HttpDemoStatsStreamTask. They are almost same, except the output topic
> >> name
> >> > is different and the task name are different at properties file. I am
> >> > wondering how should I debug on it?
> >> >
> >> >    More details are list below.
> >> >
> >> >    All your help is highly appreciated.
> >> >
> >> > Sincerely,
> >> > Selina
> >> >
> >> >     Currently HttpDemoParserStreamTask run well.
> >> >     However HttpDemoStatsStreamTask can generate the log correctly
> >> withouot
> >> > Exception at
> >> >
> >> >
> >>
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> >> > samza-container-0.log
> >> >
> >> > The last record as below is right, however there is no topic "
> >> > demo-stats-temp" was created.
> >> > --------------------------------------
> >> >
> >> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> >> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> >> >
> >> >
> >>
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> >> >
> >>
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> >> >
> >>
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> >> > Galaxy S6","operationSystem":"Android
> >> >
> >> >
> >>
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> >> >
> >> >
> >> > -------------------The demo-stats.properties
> >> > files-----------------------------
> >> >
> >> > # Job
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=demo-stats-tmp
> >>
> >> >
> >> >
> >> >
> >> >
> >>
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >> >  task.checkpoint.system=kafka
> >> >  # Normally, this would be 3, but we have only one broker.
> >> >  task.checkpoint.replication.factor=1
> >> >
> >> >  # YARN
> >> >
> >> >
> >>
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >> >
> >> >  # Task
> >> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
> >> >  task.inputs=kafka.http-demo
> >> >
> >> >  # Serializers
> >> >
> >> >
> >>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >> >
> >> >  # Kafka System
> >> >
> >> >
> >>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >> >  systems.kafka.samza.msg.serde=string
> >> >
> >> >  systems.kafka.samza.key.serde=string
> >> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> >> >
> >> >  #stream from begining
> >> >  #systems.kafka.consumer.auto.offset.reset=smallest
> >> > #http-demo from the oldest
> >> >  systems.kafka.http-demo.samza.offset.default=oldest
> >> > # all stream from the oldest
> >> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
> >> >  systems.kafka.streams.http-demo.samza.reset.offset=true
> >> >
> >> >
> >> >
> >> > --------------------HttpDemoStatsStreamTask
> >> > class----------------------------
> >> >
> >> > public class HttpDemoStatsStreamTask implements StreamTask  {
> >> >
> >> >     //output topic
> >> >     private static final SystemStream OUTPUT_STREAM = new
> >> > SystemStream("kafka", "demo-stats-temp");
> >> >     Logger logger =
> >> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
> >> >
> >> >     @SuppressWarnings("unchecked")
> >> >     @Override
> >> >     public void process(IncomingMessageEnvelope envelope,
> >> > MessageCollector collector, TaskCoordinator coordinator) throws
> >> > Exception {
> >> >
> >> >
> >> >         String key = (String) envelope.getKey();
> >> >         String message = envelope.getMessage().toString();
> >> >         logger.info("key=" + key + ": message=" + message);
> >>
> >> >
> >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> >> > message));
> >> >     }
> >> > }
> >> >
> >> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> >> > topic--------------
> >> >
> >> > {"Partition 0":0}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> > {"SystemStreamPartition [kafka, http-demo,
> >> >
> >> >
> >>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >> >
> >>
> >
> >
>

Reply via email to