Selina,

You should probably check a few things
1. Your log files to see if you have any errors. Also, does you job fail or
continues running?
2. Does this line "  logger.info("key="+key+": message="+message); " write
any logs?
3. This might not be the only reason, but you are sending messages of
type Map<String,
String>. However, in your config file, you defined "
systems.kafka.samza.msg.serde=string" which expects the message to be a
String.


Shadi


On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com>
wrote:

> Hi,  All
>
>      I am trying to write my first StreamTask class. I have a topic at
> Kafka called "http-demo". I like to read the topic and write it to another
> topic called "demo-duplicate"
>
>     Howeven there is not topic written to Kafka.
>
>     My properties file and StreamTask are below.  Can anyone told me what
> is the bug?
>     BTW, if I set checkpoint or Metrics at properties file. the topic of
> checkpoint and metrics could be written to Kafka.  And the content of
>  input topic -- http-demo could be show correctly.
>
> Your help is highly appreciated.
>
> Sincerely,
> Selina
>
>
> - - -- - - - - -
> # Job
> job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> job.name=demo-parser
>
> # YARN
>
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>
> # Task
> task.class=samza.http.demo.task.HttpDemoParserStreamTask
> task.inputs=kafka.http-demo
>
> # Serializers
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> # Kafka System
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> systems.kafka.samza.msg.serde=string
> systems.kafka.samza.key.serde=string
> systems.kafka.consumer.zookeeper.connect=localhost:2181/
> systems.kafka.consumer.auto.offset.reset=largest
> systems.kafka.producer.bootstrap.servers=localhost:9092
> - - -- - - - - -
>
> My StreamTask class is simple also
>
> ---------
>
> /**
>  *
>  * Read data from http-demo topic and write it back to "demo-duplicate"
>  */
> public class HttpDemoParserStreamTask implements StreamTask {
>
>     private static final SystemStream OUTPUT_STREAM = new
> SystemStream("kafka", "demo-duplicate");
>     Logger logger =
> LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>
>     @SuppressWarnings("unchecked")
>     @Override
>     public void process(IncomingMessageEnvelope envelope, MessageCollector
> collector, TaskCoordinator coordinator) throws Exception {
>
>         String key = (String) envelope.getKey();
>         String message = envelope.getMessage().toString();
>         logger.info("key="+key+": message="+message);
>
>         Map<String, String> outgoingMap = (Map<String, String>)
> (envelope.getMessage());
>         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> outgoingMap));
>         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> message));
>     }
>
> }
>
> -------
>

Reply via email to