Hi, Yan and Shadi:

    I made a mistake.  Actually, there is no log at /tmp/kafka-logs created
by "  logger.info("key="+key+": message="+message); ".  The log I provided
actually is log for input topic "http-demo" at /tmp/kafka-logs/http-demo-0

    My job is listed as below. However I am wondering how can I know if my
method "public void* process*(IncomingMessageEnvelope envelope,
MessageCollector collector, TaskCoordinator coordinator)" was run or not.

    I manually create topic "demo-duplicate" by command line, otherwise it
will be created by samza code.

    I checked I did not set auto.create.topics.enable anywhere. Attached is
my properties file for Kafka


   Your help is highly appreciated

Sincerely,
Selina

[image: Inline image 1]




On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <yanfang...@gmail.com> wrote:

> The code and the property seem good to me. collector.send(new
> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
> curious if you accidentally disabled auto.create.topics.enable  ...Can you
> also try to send msgs from cmd line to "demo-duplicate" to see if it gets
> anything.
>
> Let me know if it works.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <swucaree...@gmail.com>
> wrote:
>
> > Hi, Shadi:
> >
> >       Thans a lot for your reply.
> > 1. There is no error log at Kafka and Samza
> >
> > 2.  this line "  logger.info("key="+key+": message="+message); " write
> > log correctly as below:
> >
> > [image: Inline image 1]
> >
> > This are my last two message with right count
> >
> > 3. I tried both way below, none of them create topic, but I will try it
> > again.
> >
> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outgoingMap));
> >
> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
> >
> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the content
> > can be show with command line below, so the Kafka should be OK.
> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
> > --from-beginning --topic http-demo
> >
> > Your help is highly appreciated.
> >
> > Sincerely,
> > Selina
> >
> >
> >
> >
> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
> > snogh...@linkedin.com.invalid> wrote:
> >
> >> Selina,
> >>
> >> You should probably check a few things
> >> 1. Your log files to see if you have any errors. Also, does you job fail
> >> or
> >> continues running?
> >> 2. Does this line "  logger.info("key="+key+": message="+message); "
> >> write
> >> any logs?
> >> 3. This might not be the only reason, but you are sending messages of
> >> type Map<String,
> >> String>. However, in your config file, you defined "
> >> systems.kafka.samza.msg.serde=string" which expects the message to be a
> >> String.
> >>
> >>
> >> Shadi
> >>
> >>
> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <swucaree...@gmail.com>
> >> wrote:
> >>
> >> > Hi,  All
> >> >
> >> >      I am trying to write my first StreamTask class. I have a topic at
> >> > Kafka called "http-demo". I like to read the topic and write it to
> >> another
> >> > topic called "demo-duplicate"
> >> >
> >> >     Howeven there is not topic written to Kafka.
> >> >
> >> >     My properties file and StreamTask are below.  Can anyone told me
> >> what
> >> > is the bug?
> >> >     BTW, if I set checkpoint or Metrics at properties file. the topic
> of
> >> > checkpoint and metrics could be written to Kafka.  And the content of
> >> >  input topic -- http-demo could be show correctly.
> >> >
> >> > Your help is highly appreciated.
> >> >
> >> > Sincerely,
> >> > Selina
> >> >
> >> >
> >> > - - -- - - - - -
> >> > # Job
> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
> >> > job.name=demo-parser
> >>
> >> >
> >> > # YARN
> >> >
> >> >
> >>
> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
> >> >
> >> > # Task
> >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
> >> > task.inputs=kafka.http-demo
> >> >
> >> > # Serializers
> >> >
> >> >
> >>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
> >> >
> >> > # Kafka System
> >> >
> >> >
> >>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> >> > systems.kafka.samza.msg.serde=string
> >> > systems.kafka.samza.key.serde=string
> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
> >> > systems.kafka.consumer.auto.offset.reset=largest
> >> > systems.kafka.producer.bootstrap.servers=localhost:9092
> >> > - - -- - - - - -
> >> >
> >> > My StreamTask class is simple also
> >> >
> >> > ---------
> >> >
> >> > /**
> >> >  *
> >> >  * Read data from http-demo topic and write it back to
> "demo-duplicate"
> >> >  */
> >> > public class HttpDemoParserStreamTask implements StreamTask {
> >> >
> >> >     private static final SystemStream OUTPUT_STREAM = new
> >> > SystemStream("kafka", "demo-duplicate");
> >> >     Logger logger =
> >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
> >> >
> >> >     @SuppressWarnings("unchecked")
> >> >     @Override
> >> >     public void process(IncomingMessageEnvelope envelope,
> >> MessageCollector
> >> > collector, TaskCoordinator coordinator) throws Exception {
> >> >
> >> >         String key = (String) envelope.getKey();
> >> >         String message = envelope.getMessage().toString();
> >> >         logger.info("key="+key+": message="+message);
> >> >
> >> >         Map<String, String> outgoingMap = (Map<String, String>)
> >> > (envelope.getMessage());
> >> >         collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> >> > outgoingMap));
> >> >         //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
> >> > message));
> >> >     }
> >> >
> >> > }
> >> >
> >> > -------
> >> >
> >>
> >
> >
>

Reply via email to