I made simple log4j kafka appender. I copied most of the code from 0.8.0 Producer example in " https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example" to code "append" function.
I confirmed producer example code is working with my environment. But when I use same logic for log4j appender, it didn`t work. It is trying to fetch metadata repeatedly and I am getting infinite "Utils$.swallowError" error. I have no idea on swallowError. It looks it failed to fetch metadata from broker, it is trying again and again. Max retries count is just 3, but I don`t know why it happens. Are there anything that should be done to produce log data into Kafka via log4j Appender? --------------------------------------------------------------------------------------------------------------------------------- INFO [main] (Logging.scala:67) - Verifying properties INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to kafka01:9092 WARN [main] (Logging.scala:82) - Property zk.connect is not valid INFO [main] (Logging.scala:67) - Property request.required.acks is overridden to 1 INFO [main] (Logging.scala:67) - Property partitioner.class is overridden to com.samsung.rtdp.SimplePartitioner2 INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder INFO [main] (HelloWorld.java:14) - Entering application. INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s) Set(KafkaAppenderTest) INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s) Set(KafkaAppenderTest) . . . java.lang.StackOverflowError at java.lang.StringCoding.deref(StringCoding.java:64) at java.lang.StringCoding.encode(StringCoding.java:275) at java.lang.String.getBytes(String.java:954) at java.io.UnixFileSystem.getBooleanAttributes0(Native Method) at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243) at java.io.File.exists(File.java:791) at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014) at sun.misc.URLClassPath.getResource(URLClassPath.java:189) at java.net.URLClassLoader$1.run(URLClassLoader.java:209) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:643) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) at java.net.URLClassLoader.access$000(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:212) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.error(Category.java:322) at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) at kafka.utils.Utils$.swallow(Utils.scala:189) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.info(Category.java:666) at kafka.utils.Logging$class.info(Logging.scala:67) at kafka.client.ClientUtils$.info(ClientUtils.scala:31) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) at kafka.utils.Utils$.swallow(Utils.scala:187) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:46) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121) . . . ---------------------------------------------------------------------------------------------------------------------------------- Here is my code. Since it came from producer example code, it is quite straightforward. ------------------------------------------------------------------------------------------------------------------------- package com.samsung.rtdp; import java.io.IOException; import java.util.Date; import java.util.Properties; import java.util.Random; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.spi.ErrorCode; import org.apache.log4j.spi.LoggingEvent; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.Partitioner; import kafka.producer.ProducerConfig; import kafka.utils.VerifiableProperties; public class KafkaAppender extends AppenderSkeleton { private String brokerList; private String serializer; private String partitioner; private String topic; private String DEFAULT_REQUIRED_REQUEST_NUACKS="1"; private Properties props; private ProducerConfig config; private Producer<String, String> producer; public void setBrokerList(String brokerList) { this.brokerList = brokerList; } public String getBrokerList() { return this.brokerList; } public void setSerializerClass(String serializer) { this.serializer = serializer; } public String getSerializer() { return this.serializer; } public void setPartitionerClass(String partitioner) { this.partitioner = partitioner; } public String getPartitioner() { return this.partitioner; } public void setTopic(String topic) { this.topic = topic; } public String getTopic() { return this.topic; } public void printParameters(){ System.out.println("BrokerList : " + brokerList); System.out.println("Serializer Class : " + serializer); System.out.println("Partitioner Class : " + partitioner); System.out.println("Topic : " + topic); } public void activateOptions() { // printParameters(); props = new Properties(); props.put("metadata.broker.list", brokerList); props.put("serializer.class", serializer); props.put("partitioner.class", partitioner); props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS); props.put("zk.connect", "kafka01:2181"); config = new ProducerConfig(props); producer = new Producer<String, String>(config); } public void close() { // TODO Auto-generated method stub producer.close(); } public boolean requiresLayout() { // TODO Auto-generated method stub return true; } @Override protected void append(LoggingEvent event) { // TODO Auto-generated method stub // printParameters(); if( this.layout == null ) { errorHandler.error("No layout for appender " + name , null, ErrorCode.MISSING_LAYOUT ); return; } String msg = this.layout.format(event); KeyedMessage<String, String> data = new KeyedMessage<String, String>("KafkaAppenderTest", msg, msg); producer.send(data); } } ------------------------------------------------------------------------------------------------------------------------------------- And this is my log4j.properties ------------------------------------------------------------------------------------------------------------------------------------- log4j.rootLogger=INFO, stdout, KAFKA # set the logger for your package to be the KAFKA appender #log4j.logger.com.samsung.rtdp=INFO, KAFKA log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout # Pattern to output the caller's file name and line number. log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender log4j.appender.KAFKA.BrokerList=kafka01:9092 log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2 log4j.appender.KAFKA.Topic=test log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n --------------------------------------------------------------------------------------------------------------------------------------