Hi, Yan and Shadi: I made a mistake. Actually, there is no log at /tmp/kafka-logs created by " logger.info("key="+key+": message="+message); ". The log I provided actually is log for input topic "http-demo" at /tmp/kafka-logs/http-demo-0
My job is listed as below. However I am wondering how can I know if my method "public void* process*(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator)" was run or not. I manually create topic "demo-duplicate" by command line, otherwise it will be created by samza code. I checked I did not set auto.create.topics.enable anywhere. Attached is my properties file for Kafka Your help is highly appreciated Sincerely, Selina [image: Inline image 1] On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <yanfang...@gmail.com> wrote: > The code and the property seem good to me. collector.send(new > OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am > curious if you accidentally disabled auto.create.topics.enable ...Can you > also try to send msgs from cmd line to "demo-duplicate" to see if it gets > anything. > > Let me know if it works. > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com> > wrote: > > > Hi, Shadi: > > > > Thans a lot for your reply. > > 1. There is no error log at Kafka and Samza > > > > 2. this line " logger.info("key="+key+": message="+message); " write > > log correctly as below: > > > > [image: Inline image 1] > > > > This are my last two message with right count > > > > 3. I tried both way below, none of them create topic, but I will try it > > again. > > > > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap)); > > > > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message)); > > > > 4. I wrote a topic call "http-demo" to Kafka as my input, and the content > > can be show with command line below, so the Kafka should be OK. > > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 > > --from-beginning --topic http-demo > > > > Your help is highly appreciated. > > > > Sincerely, > > Selina > > > > > > > > > > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi < > > snogh...@linkedin.com.invalid> wrote: > > > >> Selina, > >> > >> You should probably check a few things > >> 1. Your log files to see if you have any errors. Also, does you job fail > >> or > >> continues running? > >> 2. Does this line " logger.info("key="+key+": message="+message); " > >> write > >> any logs? > >> 3. This might not be the only reason, but you are sending messages of > >> type Map<String, > >> String>. However, in your config file, you defined " > >> systems.kafka.samza.msg.serde=string" which expects the message to be a > >> String. > >> > >> > >> Shadi > >> > >> > >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com> > >> wrote: > >> > >> > Hi, All > >> > > >> > I am trying to write my first StreamTask class. I have a topic at > >> > Kafka called "http-demo". I like to read the topic and write it to > >> another > >> > topic called "demo-duplicate" > >> > > >> > Howeven there is not topic written to Kafka. > >> > > >> > My properties file and StreamTask are below. Can anyone told me > >> what > >> > is the bug? > >> > BTW, if I set checkpoint or Metrics at properties file. the topic > of > >> > checkpoint and metrics could be written to Kafka. And the content of > >> > input topic -- http-demo could be show correctly. > >> > > >> > Your help is highly appreciated. > >> > > >> > Sincerely, > >> > Selina > >> > > >> > > >> > - - -- - - - - - > >> > # Job > >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory > >> > job.name=demo-parser > >> > >> > > >> > # YARN > >> > > >> > > >> > yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz > >> > > >> > # Task > >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask > >> > task.inputs=kafka.http-demo > >> > > >> > # Serializers > >> > > >> > > >> > serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory > >> > > >> > # Kafka System > >> > > >> > > >> > systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory > >> > systems.kafka.samza.msg.serde=string > >> > systems.kafka.samza.key.serde=string > >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/ > >> > systems.kafka.consumer.auto.offset.reset=largest > >> > systems.kafka.producer.bootstrap.servers=localhost:9092 > >> > - - -- - - - - - > >> > > >> > My StreamTask class is simple also > >> > > >> > --------- > >> > > >> > /** > >> > * > >> > * Read data from http-demo topic and write it back to > "demo-duplicate" > >> > */ > >> > public class HttpDemoParserStreamTask implements StreamTask { > >> > > >> > private static final SystemStream OUTPUT_STREAM = new > >> > SystemStream("kafka", "demo-duplicate"); > >> > Logger logger = > >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class); > >> > > >> > @SuppressWarnings("unchecked") > >> > @Override > >> > public void process(IncomingMessageEnvelope envelope, > >> MessageCollector > >> > collector, TaskCoordinator coordinator) throws Exception { > >> > > >> > String key = (String) envelope.getKey(); > >> > String message = envelope.getMessage().toString(); > >> > logger.info("key="+key+": message="+message); > >> > > >> > Map<String, String> outgoingMap = (Map<String, String>) > >> > (envelope.getMessage()); > >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > >> > outgoingMap)); > >> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, > >> > message)); > >> > } > >> > > >> > } > >> > > >> > ------- > >> > > >> > > > > >