Hi, Yan:
Thanks for reply my email in detail. All the files at Yarn logs
shown below. No Exception under samza-Demo/deploy/yarn/logs. I guess the
StreamTask did not called ...
Partial stdout file
(samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/stderr)
is pasted below. In short, the log by logger.info("key="+key+":
message="+message); " was not generated.
/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java
-server -Dsamza.container.name=samza-application-master
-Dlog4j.configuration=file:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/log4j.xml
-Dsamza.log.dir=/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001
-Djava.io.tmpdir=/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/tmp
-Xmx768M -XX:+PrintGCDateStamps
-Xloggc:/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/logs/userlogs/application_1437767867729_0001/container_1437767867729_0001_01_000001/gc.log
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
-XX:GCLogFileSize=10241024 -d64 -cp
/Users/selina/IdeaProjects/samza-Demo/deploy/yarn/etc/hadoop:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/activation-1.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/akka-actor_2.10-2.1.2.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/aopalliance-1.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/asm-3.1.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/avro-1.7.4.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-1.7.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-beanutils-core-1.8.0.jar:/tmp/hadoop-selina/nm-local-dir/usercache/selina/appcache/application_1437767867729_0001/container_1437767867729_0001_01_000001/__package/lib/commons-cli-1.2
For file gc.log.0.current shown Allocation failure and Full GC
CommandLine flags: -XX:GCLogFileSize=10241024 -XX:InitialHeapSize=268435456
-XX:MaxHeapSize=805306368 -XX:NumberOfGCLogFiles=10 -XX:+PrintGC
-XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
-XX:+UseCompressedClassPointers -XX:+UseCompressedOops
-XX:+UseGCLogFileRotation -XX:+UseParallelGC
2015-07-24T13:28:56.901+0800: 0.694:* [GC (Allocation Failure)*
65536K->8449K(251392K), 0.0062314 secs]
2015-07-24T13:28:57.188+0800: 0.981: [GC (System.gc())
39240K->6305K(251392K), 0.0047744 secs]
2015-07-24T13:28:57.193+0800: 0.986: [Full GC (System.gc())
6305K->5940K(251392K), 0.0147206 secs]
2015-07-24T13:28:57.625+0800: 1.418: [GC (Allocation Failure)
71476K->12511K(251392K), 0.0030179 secs]
2015-07-24T13:28:59.889+0800: 3.682: [GC (Allocation Failure)
78047K->13859K(251392K), 0.0052610 secs]
2015-07-24T13:29:15.487+0800: 19.280: [GC (Metadata GC Threshold)
35659K->10106K(251392K), 0.0036350 secs]
2015-07-24T13:29:15.490+0800: 19.284: *[Full GC (Metadata GC
Threshold*) 10106K->7318K(149504K), 0.0200118 secs]
[image: Inline image 1]
Your help is highly appreciated.
Sincerely,
Selina
On Fri, Jul 24, 2015 at 1:51 PM, Yan Fang <[email protected]> wrote:
> {quote}
> I did not set auto.create.topics.enable anywhere
> {quote}
>
> Fine. Then its default to true. No worries.
>
> {quote}
> My job is listed as below. However I am wondering how can I know if my
> method "public void* process*(IncomingMessageEnvelope envelope,
> MessageCollector collector, TaskCoordinator coordinator)" was run or not.
> {quote}
>
> If you have log enabled (from the code, you did), you can check the
> contain's log to see if it has the output. Assuming you are using the local
> yarn like what hello-samza provides, you should be able to check the logs
> in deploy/yarn/userlogs/application_Id.
>
> If you use print.out method, you can see the result in the
> deploy/yarn/userlogs/application_Id 's sysout file (if the StreamTask)
> works.
>
> If it does not work, you can check the logs in
> deploy/yarn/userlogs/application_Id as well to see the exceptions if there
> is any.
>
> Thanks,
>
> Fang, Yan
> [email protected]
>
> On Fri, Jul 24, 2015 at 1:45 PM, Job-Selina Wu <[email protected]>
> wrote:
>
>> Hi, Yan and Shadi:
>>
>> I made a mistake. Actually, there is no log at /tmp/kafka-logs
>> created by " logger.info("key="+key+": message="+message); ". The log
>> I provided actually is log for input topic "http-demo" at
>> /tmp/kafka-logs/http-demo-0
>>
>> My job is listed as below. However I am wondering how can I know if
>> my method "public void* process*(IncomingMessageEnvelope envelope,
>> MessageCollector collector, TaskCoordinator coordinator)" was run or
>> not.
>>
>> I manually create topic "demo-duplicate" by command line, otherwise
>> it will be created by samza code.
>>
>> I checked I did not set auto.create.topics.enable anywhere. Attached
>> is my properties file for Kafka
>>
>>
>> Your help is highly appreciated
>>
>> Sincerely,
>> Selina
>>
>> [image: Inline image 1]
>>
>>
>>
>>
>> On Fri, Jul 24, 2015 at 11:56 AM, Yan Fang <[email protected]> wrote:
>>
>>> The code and the property seem good to me. collector.send(new
>>> OutgoingMessageEnvelope(OUTPUT_STREAM, message));should work. So I am
>>> curious if you accidentally disabled auto.create.topics.enable ...Can
>>> you
>>> also try to send msgs from cmd line to "demo-duplicate" to see if it gets
>>> anything.
>>>
>>> Let me know if it works.
>>>
>>> Thanks,
>>>
>>> Fang, Yan
>>> [email protected]
>>>
>>> On Fri, Jul 24, 2015 at 11:48 AM, Job-Selina Wu <[email protected]>
>>> wrote:
>>>
>>> > Hi, Shadi:
>>> >
>>> > Thans a lot for your reply.
>>> > 1. There is no error log at Kafka and Samza
>>> >
>>> > 2. this line " logger.info("key="+key+": message="+message); " write
>>> > log correctly as below:
>>> >
>>> > [image: Inline image 1]
>>> >
>>> > This are my last two message with right count
>>> >
>>> > 3. I tried both way below, none of them create topic, but I will try it
>>> > again.
>>> >
>>> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> outgoingMap));
>>> >
>>> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, message));
>>> >
>>> > 4. I wrote a topic call "http-demo" to Kafka as my input, and the
>>> content
>>> > can be show with command line below, so the Kafka should be OK.
>>> > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181
>>> > --from-beginning --topic http-demo
>>> >
>>> > Your help is highly appreciated.
>>> >
>>> > Sincerely,
>>> > Selina
>>> >
>>> >
>>> >
>>> >
>>> > On Fri, Jul 24, 2015 at 9:29 AM, Shadi Noghabi <
>>> > [email protected]> wrote:
>>> >
>>> >> Selina,
>>> >>
>>> >> You should probably check a few things
>>> >> 1. Your log files to see if you have any errors. Also, does you job
>>> fail
>>> >> or
>>> >> continues running?
>>> >> 2. Does this line " logger.info("key="+key+": message="+message); "
>>> >> write
>>> >> any logs?
>>> >> 3. This might not be the only reason, but you are sending messages of
>>> >> type Map<String,
>>> >> String>. However, in your config file, you defined "
>>> >> systems.kafka.samza.msg.serde=string" which expects the message to be
>>> a
>>> >> String.
>>> >>
>>> >>
>>> >> Shadi
>>> >>
>>> >>
>>> >> On Thu, Jul 23, 2015 at 6:33 PM, Job-Selina Wu <[email protected]
>>> >
>>> >> wrote:
>>> >>
>>> >> > Hi, All
>>> >> >
>>> >> > I am trying to write my first StreamTask class. I have a topic
>>> at
>>> >> > Kafka called "http-demo". I like to read the topic and write it to
>>> >> another
>>> >> > topic called "demo-duplicate"
>>> >> >
>>> >> > Howeven there is not topic written to Kafka.
>>> >> >
>>> >> > My properties file and StreamTask are below. Can anyone told me
>>> >> what
>>> >> > is the bug?
>>> >> > BTW, if I set checkpoint or Metrics at properties file. the
>>> topic of
>>> >> > checkpoint and metrics could be written to Kafka. And the content
>>> of
>>> >> > input topic -- http-demo could be show correctly.
>>> >> >
>>> >> > Your help is highly appreciated.
>>> >> >
>>> >> > Sincerely,
>>> >> > Selina
>>> >> >
>>> >> >
>>> >> > - - -- - - - - -
>>> >> > # Job
>>> >> > job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
>>> >> > job.name=demo-parser
>>> >>
>>> >> >
>>> >> > # YARN
>>> >> >
>>> >> >
>>> >>
>>> yarn.package.path=file:///Users/selina/IdeaProjects/samza-Demo/target/hello-samza-0.9.1-dist.tar.gz
>>> >> >
>>> >> > # Task
>>> >> > task.class=samza.http.demo.task.HttpDemoParserStreamTask
>>> >> > task.inputs=kafka.http-demo
>>> >> >
>>> >> > # Serializers
>>> >> >
>>> >> >
>>> >>
>>> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>>> >> >
>>> >> > # Kafka System
>>> >> >
>>> >> >
>>> >>
>>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>>> >> > systems.kafka.samza.msg.serde=string
>>> >> > systems.kafka.samza.key.serde=string
>>> >> > systems.kafka.consumer.zookeeper.connect=localhost:2181/
>>> >> > systems.kafka.consumer.auto.offset.reset=largest
>>> >> > systems.kafka.producer.bootstrap.servers=localhost:9092
>>> >> > - - -- - - - - -
>>> >> >
>>> >> > My StreamTask class is simple also
>>> >> >
>>> >> > ---------
>>> >> >
>>> >> > /**
>>> >> > *
>>> >> > * Read data from http-demo topic and write it back to
>>> "demo-duplicate"
>>> >> > */
>>> >> > public class HttpDemoParserStreamTask implements StreamTask {
>>> >> >
>>> >> > private static final SystemStream OUTPUT_STREAM = new
>>> >> > SystemStream("kafka", "demo-duplicate");
>>> >> > Logger logger =
>>> >> > LoggerFactory.getLogger(HttpDemoParserStreamTask.class);
>>> >> >
>>> >> > @SuppressWarnings("unchecked")
>>> >> > @Override
>>> >> > public void process(IncomingMessageEnvelope envelope,
>>> >> MessageCollector
>>> >> > collector, TaskCoordinator coordinator) throws Exception {
>>> >> >
>>> >> > String key = (String) envelope.getKey();
>>> >> > String message = envelope.getMessage().toString();
>>> >> > logger.info("key="+key+": message="+message);
>>> >> >
>>> >> > Map<String, String> outgoingMap = (Map<String, String>)
>>> >> > (envelope.getMessage());
>>> >> > collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> >> > outgoingMap));
>>> >> > //collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM,
>>> >> > message));
>>> >> > }
>>> >> >
>>> >> > }
>>> >> >
>>> >> > -------
>>> >> >
>>> >>
>>> >
>>> >
>>>
>>
>>
>