Hi, Yan

I like to correct my previous comment, when I comment out
systems.kafka.streams.http-demo.samza.offset.default=oldest
systems.kafka.streams.http-demo.samza.reset.offset=true

*the logger is not show at *at samza-container-0.log, but it make sense.


Sincerely,
Seina

On Tue, Jul 28, 2015 at 11:30 AM, Job-Selina Wu <swucaree...@gmail.com>
wrote:

> Hi, Yan:
>
>       Thanks a lot for your reply.
>      I tried to comment out 
> systems.kafka.http-demo.samza.offset.default=oldest
> and then I tried to comment out
> systems.kafka.streams.http-demo.samza.offset.default=oldest
> systems.kafka.streams.http-demo.samza.reset.offset=true
>
>  The result is same as before.  1. the checkoutpoint topic was created, 2.
> the log created by Logger can be found at /samza-container-0.log. 3. no
> exception is at samza-container-0.log.
>
>    I guess something conflict between HttpDemoParserStreamTask and
> HttpDemoStatsStreamTask? Is any resource registered by
> HttpDemoParserStreamTask and then HttpDemoStatsStreamTask can not recreate
> a topic?
>
> Sincerely,
> Selina
>
> On Tue, Jul 28, 2015 at 9:37 AM, Yan Fang <yanfang...@gmail.com> wrote:
>
>> Can you comment out  "systems.kafka.http-demo.samza.offset.default=oldest"
>> to see how it works? This seems not a correct property.
>>
>> Thanks,
>>
>> Fang, Yan
>> yanfang...@gmail.com
>>
>> On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com>
>> wrote:
>>
>> > Hi, Dear All:
>> >
>> >        I have two Tasks at Samza. HttpDemoParserStreamTask and
>> > HttpDemoStatsStreamTask. They are almost same, except the output topic
>> name
>> > is different and the task name are different at properties file. I am
>> > wondering how should I debug on it?
>> >
>> >    More details are list below.
>> >
>> >    All your help is highly appreciated.
>> >
>> > Sincerely,
>> > Selina
>> >
>> >     Currently HttpDemoParserStreamTask run well.
>> >     However HttpDemoStatsStreamTask can generate the log correctly
>> withouot
>> > Exception at
>> >
>> >
>> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
>> > samza-container-0.log
>> >
>> > The last record as below is right, however there is no topic "
>> > demo-stats-temp" was created.
>> > --------------------------------------
>> >
>> > 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
>> > key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
>> >
>> >
>> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
>> >
>> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
>> >
>> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
>> > Galaxy S6","operationSystem":"Android
>> >
>> >
>> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
>> >
>> >
>> > -------------------The demo-stats.properties
>> > files-----------------------------
>> >
>> > # Job
>> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>> > job.name=demo-stats-tmp
>>
>> >
>> >
>> >
>> >
>> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>> >  task.checkpoint.system=kafka
>> >  # Normally, this would be 3, but we have only one broker.
>> >  task.checkpoint.replication.factor=1
>> >
>> >  # YARN
>> >
>> >
>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>> >
>> >  # Task
>> >  task.class=samza.http.demo.task.HttpDemoParserStreamTask
>> >  task.inputs=kafka.http-demo
>> >
>> >  # Serializers
>> >
>> >
>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>> >
>> >  # Kafka System
>> >
>> >
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> >  systems.kafka.samza.msg.serde=string
>> >
>> >  systems.kafka.samza.key.serde=string
>> >  systems.kafka.consumer.zookeeper.connect=localhost:2181/
>> >  systems.kafka.producer.bootstrap.servers=localhost:9092
>> >
>> >  #stream from begining
>> >  #systems.kafka.consumer.auto.offset.reset=smallest
>> > #http-demo from the oldest
>> >  systems.kafka.http-demo.samza.offset.default=oldest
>> > # all stream from the oldest
>> >  systems.kafka.streams.http-demo.samza.offset.default=oldest
>> >  systems.kafka.streams.http-demo.samza.reset.offset=true
>> >
>> >
>> >
>> > --------------------HttpDemoStatsStreamTask
>> > class----------------------------
>> >
>> > public class HttpDemoStatsStreamTask implements StreamTask  {
>> >
>> >     //output topic
>> >     private static final SystemStream OUTPUT_STREAM = new
>> > SystemStream("kafka", "demo-stats-temp");
>> >     Logger logger =
>> LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
>> >
>> >     @SuppressWarnings("unchecked")
>> >     @Override
>> >     public void process(IncomingMessageEnvelope envelope,
>> > MessageCollector collector, TaskCoordinator coordinator) throws
>> > Exception {
>> >
>> >
>> >         String key = (String) envelope.getKey();
>> >         String message = envelope.getMessage().toString();
>> >         logger.info("key=" + key + ": message=" + message);
>>
>> >
>> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>> > message));
>> >     }
>> > }
>> >
>> > -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
>> > topic--------------
>> >
>> > {"Partition 0":0}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> > {"SystemStreamPartition [kafka, http-demo,
>> >
>> >
>> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>> >
>>
>
>

Reply via email to