Selina, You should probably check a few things 1. Your log files to see if you have any errors. Also, does you job fail or continues running? 2. Does this line " logger.info("key="+key+": message="+message); " write any logs? 3. This might not be the only reason, but you are sending messages of type Map<String, String>. However, in your config file, you defined " systems.kafka.samza.msg.serde=string" which expects the message to be a String.
Shadi On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com> wrote: > Hi, All > > I am trying to write my first StreamTask class. I have a topic at > Kafka called "http-demo". I like to read the topic and write it to another > topic called "demo-duplicate" > > Howeven there is not topic written to Kafka. > > My properties file and StreamTask are below. Can anyone told me what > is the bug? > BTW, if I set checkpoint or Metrics at properties file. the topic of > checkpoint and metrics could be written to Kafka. And the content of > input topic -- http-demo could be show correctly. > > Your help is highly appreciated. > > Sincerely, > Selina > > > - - -- - - - - - > # Job > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > job.name=demo-parser > > # YARN > > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > > # Task > task.class=samza.http.demo.task.HttpDemoParserStreamTask > task.inputs=kafka.http-demo > > # Serializers > > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > > # Kafka System > > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > systems.kafka.samza.msg.serde=string > systems.kafka.samza.key.serde=string > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > systems.kafka.consumer.auto.offset.reset=largest > systems.kafka.producer.bootstrap.servers=localhost:9092 > - - -- - - - - - > > My StreamTask class is simple also > > --------- > > /** > * > * Read data from http-demo topic and write it back to "demo-duplicate" > */ > public class HttpDemoParserStreamTask implements StreamTask { > > private static final SystemStream OUTPUT_STREAM = new > SystemStream("kafka", "demo-duplicate"); > Logger logger = > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); > > @SuppressWarnings("unchecked") > @Override > public void process(IncomingMessageEnvelope envelope, MessageCollector > collector, TaskCoordinator coordinator) throws Exception { > > String key = (String) envelope.getKey(); > String message = envelope.getMessage().toString(); > logger.info("key="+key+": message="+message); > > Map<String, String> outgoingMap = (Map<String, String>) > (envelope.getMessage()); > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > outgoingMap)); > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > message)); > } > > } > > ------- >