Hi, Dear All:

       I have two Tasks at Samza. HttpDemoParserStreamTask and
HttpDemoStatsStreamTask. They are almost same, except the output topic name
is different and the task name are different at properties file. I am
wondering how should I debug on it?

   More details are list below.

   All your help is highly appreciated.

Sincerely,
Selina

    Currently HttpDemoParserStreamTask run well.
    However HttpDemoStatsStreamTask can generate the log correctly withouot
Exception at
deploy/yarn/logs/userlogs/application_1438043584310_0001/container_1438043584310_0001_01_000002/
samza-container-0.log

The last record as below is right, however there is no topic "
demo-stats-temp" was created.
--------------------------------------

2015-07-27 17:34:48 HttpDemoParserStreamTask [INFO]
key=CAESEAbQ1pC2TBvb-4SLDjMqsZ8: message={"timestamp":"2015-07-27
14:30:02:987","date":"06-21-2015","id":"CAESEAbQ1pC2TBvb-4SLDjMqsZ8","ip":"22.231.113.69","browser":"Chrome","postalCode":"95131","url":"http://somthing.sample2.com/whatever?someinfo\u003dwekio2icicicnenneniadidi","language":"ENG","mobileBrand":"Samsung","carrierName":"Tmobile","deviceName":"Samsung
Galaxy S6","operationSystem":"Android
5.0.2","screenSize":"5.1-inch","resolution":"1440p","campaignId":"65681290456292569","count":"5607"}


-------------------The demo-stats.properties files-----------------------------

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=demo-stats-tmp


 
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
 task.checkpoint.system=kafka
 # Normally, this would be 3, but we have only one broker.
 task.checkpoint.replication.factor=1

 # YARN
 
yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz

 # Task
 task.class=samza.http.demo.task.HttpDemoParserStreamTask
 task.inputs=kafka.http-demo

 # Serializers
 
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

 # Kafka System
 systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
 systems.kafka.samza.msg.serde=string

 systems.kafka.samza.key.serde=string
 systems.kafka.consumer.zookeeper.connect=localhost:2181/
 systems.kafka.producer.bootstrap.servers=localhost:9092

 #stream from begining
 #systems.kafka.consumer.auto.offset.reset=smallest
#http-demo from the oldest
 systems.kafka.http-demo.samza.offset.default=oldest
# all stream from the oldest
 systems.kafka.streams.http-demo.samza.offset.default=oldest
 systems.kafka.streams.http-demo.samza.reset.offset=true



--------------------HttpDemoStatsStreamTask class----------------------------

public class HttpDemoStatsStreamTask implements StreamTask  {

    //output topic
    private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "demo-stats-temp");
    Logger logger = LoggerFactory.getLogger(HttpDemoStatsStreamTask.class);

    @SuppressWarnings("unchecked")
    @Override
    public void process(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator) throws
Exception {


        String key = (String) envelope.getKey();
        String message = envelope.getMessage().toString();
        logger.info("key=" + key + ": message=" + message);

        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
    }
}

-----Tail  of __samza_checkpoint_ver_1_for_demo-stats-tmp_1 topic--------------

{"Partition 0":0}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"0","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}
{"SystemStreamPartition [kafka, http-demo,
0]":{"system":"kafka","partition":"0","offset":"12601","stream":"http-demo"}}

Reply via email to