Hi, All I am trying to write my first StreamTask class. I have a topic at Kafka called "http-demo". I like to read the topic and write it to another topic called "demo-duplicate"
Howeven there is not topic written to Kafka. My properties file and StreamTask are below. Can anyone told me what is the bug? BTW, if I set checkpoint or Metrics at properties file. the topic of checkpoint and metrics could be written to Kafka. And the content of input topic -- http-demo could be show correctly. Your help is highly appreciated. Sincerely, Selina - - -- - - - - - # Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=demo-parser # YARN yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz # Task task.class=samza.http.demo.task.HttpDemoParserStreamTask task.inputs=kafka.http-demo # Serializers serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory # Kafka System systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.msg.serde=string systems.kafka.samza.key.serde=string systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.consumer.auto.offset.reset=largest systems.kafka.producer.bootstrap.servers=localhost:9092 - - -- - - - - - My StreamTask class is simple also --------- /** * * Read data from http-demo topic and write it back to "demo-duplicate" */ public class HttpDemoParserStreamTask implements StreamTask { private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "demo-duplicate"); Logger logger = LoggerFactory.getLogger(HttpDemoParserStreamTask.class); @SuppressWarnings("unchecked") @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws Exception { String key = (String) envelope.getKey(); String message = envelope.getMessage().toString(); logger.info("key="+key+": message="+message); Map<String, String> outgoingMap = (Map<String, String>) (envelope.getMessage()); collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); } } -------