Hi, Yan:

      Thanks a lot for your reply.
     I tried to comment out systems.kafka.http-demo.samza.offset.default=oldest
and then I tried to comment out
systems.kafka.streams.http-demo.samza.offset.default=oldest
systems.kafka.streams.http-demo.samza.reset.offset=true

 The result is same as before.  1. the checkoutpoint topic was created, 2.
the log created by Logger can be found at /samza-container-0.log. 3. no
exception is at samza-container-0.log.

   I guess something conflict between HttpDemoParserStreamTask and
HttpDemoStatsStreamTask? Is any resource registered by
HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not recreate
a topic?

Sincerely,
Selina

On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <yanfang...@gmail.com> wrote:

> Can you comment out  "systems.kafka.http-demo.samza.offset.default=oldest"
> to see how it works? This seems not a correct property.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com>
> wrote:
>
> > Hi, Dear All:
> >
> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
> > HttpDemoStatsStreamTask. They are almost same, except the output topic
> name
> > is different and the task name are different at properties file. I am
> > wondering how should I debug on it?
> >
> >    More details are list below.
> >
> >    All your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> >     Currently HttpDemoParserStreamTask run well.
> >     However HttpDemoStatsStreamTask can generate the log correctly
> withouot
> > Exception at
> >
> >
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> > samza-container-0.log
> >
> > The last record as below is right, however there is no topic "
> > demo-stats-temp" was created.
> > --------------------------------------
> >
> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
> >
> >
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> >
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> >
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> > Galaxy S6","operationSystem":"Android
> >
> >
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
> >
> >
> > -------------------The demo-stats.properties
> > files-----------------------------
> >
> > # Job
> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> > job.name=demo-stats-tmp
> >
> >
> >
> >
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
> >  task.checkpoint.system=kafka
> >  # Normally, this would be 3, but we have only one broker.
> >  task.checkpoint.replication.factor=1
> >
> >  # YARN
> >
> >
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >
> >  # Task
> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
> >  task.inputs=kafka.http-demo
> >
> >  # Serializers
> >
> >
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >
> >  # Kafka System
> >
> >
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >  systems.kafka.samza.msg.serde=string
> >
> >  systems.kafka.samza.key.serde=string
> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >  systems.kafka.producer.bootstrap.servers=localhost:9092
> >
> >  #stream from begining
> >  #systems.kafka.consumer.auto.offset.reset=smallest
> > #http-demo from the oldest
> >  systems.kafka.http-demo.samza.offset.default=oldest
> > # all stream from the oldest
> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
> >  systems.kafka.streams.http-demo.samza.reset.offset=true
> >
> >
> >
> > --------------------HttpDemoStatsStreamTask
> > class----------------------------
> >
> > public class HttpDemoStatsStreamTask implements StreamTask  {
> >
> >     //output topic
> >     private static final SystemStream OUTPUT_STREAM = new
> > SystemStream("kafka", "demo-stats-temp");
> >     Logger logger =
> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
> >
> >     @SuppressWarnings("unchecked")
> >     @Override
> >     public void process(IncomingMessageEnvelope envelope,
> > MessageCollector collector, TaskCoordinator coordinator) throws
> > Exception {
> >
> >
> >         String key = (String) envelope.getKey();
> >         String message = envelope.getMessage().toString();
> >         logger.info("key=" + key + ": message=" + message);
> >
> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> > message));
> >     }
> > }
> >
> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> > topic--------------
> >
> > {"Partition 0":0}
> > {"SystemStreamPartition [kafka, http-demo,
> > 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> > {"SystemStreamPartition [kafka, http-demo,
> >
> >
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> >
>

Reply via email to