Just change config/log4j/properties inside Kafka. Thanks,
Jun On Tue, Mar 4, 2014 at 4:09 PM, 김동경 <style9...@gmail.com> wrote: > I couldn`t find any configuration relevant to turning off the log in > http://kafka.apache.org/documentation.html#configuration. > I included Kafka as Maven dependency. > How could I turn off the Kafka log in the code? > > Thanks > Regards > Dongkyoung > > 2014-03-04 14:40 GMT+09:00 Jun Rao <jun...@gmail.com>: > > > I think it tries to add the logging in Kafka itself back to the > > KafkaAppender. > > This creates an infinite loop. Maybe you could try setting the log level > in > > Kafka package to OFF? > > > > Thanks, > > > > Jun > > > > > > On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <style9...@gmail.com> wrote: > > > > > I made simple log4j kafka appender. > > > I copied most of the code from 0.8.0 Producer example in " > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example > > " > > > to code "append" function. > > > > > > I confirmed producer example code is working with my environment. > > > But when I use same logic for log4j appender, it didn`t work. > > > It is trying to fetch metadata repeatedly and I am getting infinite > > > "Utils$.swallowError" error. > > > > > > I have no idea on swallowError. > > > It looks it failed to fetch metadata from broker, it is trying again > and > > > again. > > > Max retries count is just 3, but I don`t know why it happens. > > > > > > Are there anything that should be done to produce log data into Kafka > via > > > log4j Appender? > > > > > > > > > > > > --------------------------------------------------------------------------------------------------------------------------------- > > > INFO [main] (Logging.scala:67) - Verifying properties > > > INFO [main] (Logging.scala:67) - Property metadata.broker.list is > > > overridden to kafka01:9092 > > > WARN [main] (Logging.scala:82) - Property zk.connect is not valid > > > INFO [main] (Logging.scala:67) - Property request.required.acks is > > > overridden to 1 > > > INFO [main] (Logging.scala:67) - Property partitioner.class is > > overridden > > > to com.samsung.rtdp.SimplePartitioner2 > > > INFO [main] (Logging.scala:67) - Property serializer.class is > overridden > > > to kafka.serializer.StringEncoder > > > INFO [main] (HelloWorld.java:14) - Entering application. > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > INFO [main] (HelloWorld.java:14) - Fetching metadata from broker > > > id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s) > > > Set(KafkaAppenderTest) > > > . > > > . > > > . > > > java.lang.StackOverflowError > > > at java.lang.StringCoding.deref(StringCoding.java:64) > > > at java.lang.StringCoding.encode(StringCoding.java:275) > > > at java.lang.String.getBytes(String.java:954) > > > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method) > > > at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243) > > > at java.io.File.exists(File.java:791) > > > at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014) > > > at sun.misc.URLClassPath.getResource(URLClassPath.java:189) > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:209) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323) > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268) > > > at java.lang.ClassLoader.defineClass1(Native Method) > > > at java.lang.ClassLoader.defineClass(ClassLoader.java:643) > > > at > > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) > > > at java.net.URLClassLoader.access$000(URLClassLoader.java:73) > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:212) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323) > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268) > > > at > > > > > > > > > org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) > > > at > > > > > > org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) > > > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) > > > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > > > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > > > at > > > > > > > > > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > > > at org.apache.log4j.Category.callAppenders(Category.java:206) > > > at org.apache.log4j.Category.forcedLog(Category.java:391) > > > at org.apache.log4j.Category.error(Category.java:322) > > > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) > > > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) > > > at kafka.utils.Utils$.swallow(Utils.scala:189) > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > at kafka.utils.Utils$.swallowError(Utils.scala:46) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > > at kafka.producer.Producer.send(Producer.scala:76) > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121) > > > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > > > at > > > > > > > > > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > > > at org.apache.log4j.Category.callAppenders(Category.java:206) > > > at org.apache.log4j.Category.forcedLog(Category.java:391) > > > at org.apache.log4j.Category.info(Category.java:666) > > > at kafka.utils.Logging$class.info(Logging.scala:67) > > > at kafka.client.ClientUtils$.info(ClientUtils.scala:31) > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) > > > at > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > > > at kafka.utils.Utils$.swallow(Utils.scala:187) > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > at kafka.utils.Utils$.swallowError(Utils.scala:46) > > > at > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > > > at kafka.producer.Producer.send(Producer.scala:76) > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121) > > > . > > > . > > > . > > > > > > > > > ---------------------------------------------------------------------------------------------------------------------------------- > > > > > > > > > Here is my code. > > > Since it came from producer example code, it is quite straightforward. > > > > > > > > > ------------------------------------------------------------------------------------------------------------------------- > > > package com.samsung.rtdp; > > > > > > import java.io.IOException; > > > import java.util.Date; > > > import java.util.Properties; > > > import java.util.Random; > > > > > > import org.apache.log4j.AppenderSkeleton; > > > import org.apache.log4j.spi.ErrorCode; > > > import org.apache.log4j.spi.LoggingEvent; > > > > > > import kafka.javaapi.producer.Producer; > > > import kafka.producer.KeyedMessage; > > > import kafka.producer.Partitioner; > > > import kafka.producer.ProducerConfig; > > > import kafka.utils.VerifiableProperties; > > > > > > > > > public class KafkaAppender extends AppenderSkeleton { > > > > > > private String brokerList; > > > private String serializer; > > > private String partitioner; > > > private String topic; > > > private String DEFAULT_REQUIRED_REQUEST_NUACKS="1"; > > > > > > private Properties props; > > > private ProducerConfig config; > > > private Producer<String, String> producer; > > > > > > public void setBrokerList(String brokerList) { this.brokerList = > > > brokerList; } > > > public String getBrokerList() { return this.brokerList; } > > > > > > public void setSerializerClass(String serializer) { this.serializer = > > > serializer; } > > > public String getSerializer() { return this.serializer; } > > > > > > public void setPartitionerClass(String partitioner) { this.partitioner > = > > > partitioner; } > > > public String getPartitioner() { return this.partitioner; } > > > > > > public void setTopic(String topic) { this.topic = topic; } > > > public String getTopic() { return this.topic; } > > > > > > > > > public void printParameters(){ > > > System.out.println("BrokerList : " + brokerList); > > > System.out.println("Serializer Class : " + serializer); > > > System.out.println("Partitioner Class : " + partitioner); > > > System.out.println("Topic : " + topic); > > > } > > > > > > public void activateOptions() { > > > > > > // printParameters(); > > > > > > props = new Properties(); > > > > > > props.put("metadata.broker.list", brokerList); > > > props.put("serializer.class", serializer); > > > props.put("partitioner.class", partitioner); > > > props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS); > > > props.put("zk.connect", "kafka01:2181"); > > > > > > config = new ProducerConfig(props); > > > producer = new Producer<String, String>(config); > > > } > > > > > > > > > public void close() { > > > // TODO Auto-generated method stub > > > producer.close(); > > > > > > } > > > > > > > > > public boolean requiresLayout() { > > > // TODO Auto-generated method stub > > > return true; > > > } > > > > > > @Override > > > protected void append(LoggingEvent event) { > > > // TODO Auto-generated method stub > > > > > > > > > // printParameters(); > > > > > > if( this.layout == null ) > > > { > > > errorHandler.error("No layout for appender " + name , null, > > > ErrorCode.MISSING_LAYOUT ); > > > return; > > > } > > > > > > String msg = this.layout.format(event); > > > > > > KeyedMessage<String, String> data = new KeyedMessage<String, > > > String>("KafkaAppenderTest", msg, msg); > > > > > > producer.send(data); > > > } > > > > > > } > > > > > > > > > > > > ------------------------------------------------------------------------------------------------------------------------------------- > > > > > > > > > > > > And this is my log4j.properties > > > > > > > > > ------------------------------------------------------------------------------------------------------------------------------------- > > > log4j.rootLogger=INFO, stdout, KAFKA > > > # set the logger for your package to be the KAFKA appender > > > #log4j.logger.com.samsung.rtdp=INFO, KAFKA > > > > > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender > > > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout > > > > > > # Pattern to output the caller's file name and line number. > > > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n > > > > > > log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender > > > log4j.appender.KAFKA.BrokerList=kafka01:9092 > > > log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder > > > > log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2 > > > log4j.appender.KAFKA.Topic=test > > > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout > > > log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n > > > > > > > > > -------------------------------------------------------------------------------------------------------------------------------------- > > > > > >