Can you comment out  "systems.kafka.http-demo.samza.offset.default=oldest"
to see how it works? This seems not a correct property.

Thanks,

Fang, Yan
yanfang...@gmail.com

On Mon, Jul 27, 2015 at 5:54 PM, Job-Selina Wu <swucaree...@gmail.com>
wrote:

> Hi, Dear All:
>
>        I have two Tasks at Samza. HttpDemoParserStreamTask and
> HttpDemoStatsStreamTask. They are almost same, except the output topic name
> is different and the task name are different at properties file. I am
> wondering how should I debug on it?
>
>    More details are list below.
>
>    All your help is highly appreciated.
>
> Sincerely,
> Selina
>
>     Currently HttpDemoParserStreamTask run well.
>     However HttpDemoStatsStreamTask can generate the log correctly withouot
> Exception at
>
> deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
> samza-container-0.log
>
> The last record as below is right, however there is no topic "
> demo-stats-temp" was created.
> --------------------------------------
>
> 2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
> key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
>
> 14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"
> http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi
> ","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
> Galaxy S6","operationSystem":"Android
>
> 5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}
>
>
> -------------------The demo-stats.properties
> files-----------------------------
>
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=demo-stats-tmp
>
>
>
>  
> task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
>  task.checkpoint.system=kafka
>  # Normally, this would be 3, but we have only one broker.
>  task.checkpoint.replication.factor=1
>
>  # YARN
>
>  
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>
>  # Task
>  task.class=samza.http.demo.task.HttpDemoParserStreamTask
>  task.inputs=kafka.http-demo
>
>  # Serializers
>
>  
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
>  # Kafka System
>
>  systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>  systems.kafka.samza.msg.serde=string
>
>  systems.kafka.samza.key.serde=string
>  systems.kafka.consumer.zookeeper.connect=localhost:2181/
>  systems.kafka.producer.bootstrap.servers=localhost:9092
>
>  #stream from begining
>  #systems.kafka.consumer.auto.offset.reset=smallest
> #http-demo from the oldest
>  systems.kafka.http-demo.samza.offset.default=oldest
> # all stream from the oldest
>  systems.kafka.streams.http-demo.samza.offset.default=oldest
>  systems.kafka.streams.http-demo.samza.reset.offset=true
>
>
>
> --------------------HttpDemoStatsStreamTask
> class----------------------------
>
> public class HttpDemoStatsStreamTask implements StreamTask  {
>
>     //output topic
>     private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "demo-stats-temp");
>     Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);
>
>     @SuppressWarnings("unchecked")
>     @Override
>     public void process(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator) throws
> Exception {
>
>
>         String key = (String) envelope.getKey();
>         String message = envelope.getMessage().toString();
>         logger.info("key=" + key + ": message=" + message);
>
>         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> message));
>     }
> }
>
> -----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1
> topic--------------
>
> {"Partition 0":0}
> {"SystemStreamPartition [kafka, http-demo,
> 0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
> {"SystemStreamPartition [kafka, http-demo,
>
> 0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
>

Reply via email to