Hi,  All

     I am trying to write my first StreamTask class. I have a topic at
Kafka called "http-demo". I like to read the topic and write it to another
topic called "demo-duplicate"

    Howeven there is not topic written to Kafka.

    My properties file and StreamTask are below.  Can anyone told me what
is the bug?
    BTW, if I set checkpoint or Metrics at properties file. the topic of
checkpoint and metrics could be written to Kafka.  And the content of
 input topic -- http-demo could be show correctly.

Your help is highly appreciated.

Sincerely,
Selina


- - -- - - - - -
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=demo-parser

# YARN
yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz

# Task
task.class=samza.http.demo.task.HttpDemoParserStreamTask
task.inputs=kafka.http-demo

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory

# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.samza.key.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092
- - -- - - - - -

My StreamTask class is simple also

---------

/**
 *
 * Read data from http-demo topic and write it back to "demo-duplicate"
 */
public class HttpDemoParserStreamTask implements StreamTask {

    private static final SystemStream OUTPUT_STREAM = new
SystemStream("kafka", "demo-duplicate");
    Logger logger = LoggerFactory.getLogger(HttpDemoParserStreamTask.class);

    @SuppressWarnings("unchecked")
    @Override
    public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {

        String key = (String) envelope.getKey();
        String message = envelope.getMessage().toString();
        logger.info("key="+key+": message="+message);

        Map<String, String> outgoingMap = (Map<String, String>)
(envelope.getMessage());
        collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
outgoingMap));
        //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
message));
    }

}

-------

Reply via email to