That sounds like either a protobuf dependency compatibility issue between what is on the classpath of kafka connect and the hadoop cluster you are trying to write to (e.g. you're on a newer version of protobuf than your cluster, or vice versa), or a wire incompatilibty of the communcation protocol between the version you're writing with and the version of your cluster.
Hopefully that gives you something to look at. On Thu, Aug 3, 2017 at 7:29 AM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > I installed the Confluent Connect package on DC/OS. It started a worker on > the cluster without any problem. I am using version 3.2.2 (compatible with > 0.10.2.1). > > Now I have a Kafka streams application that generates Avro into a Kafka > topic named avro-topic .. and I configure the HdfsSinkConnector as follows > .. > > curl -X POST -H "Content-Type: application/json" --data '{"name": > > "ks-hdfs-sink", "config": {"connector.class":"HdfsSinkConnector", > > "tasks.max":"1", "hdfs.url":"hdfs://10.8.0.18:18039", > > "topics":"avro-topic", "flush.size":"1000" }}' > > http://10.8.0.18:4904/connectors > > > When avro is produced in the topic, I get the following exceptions in the > log, which shuts down the HdfsSinkConnector .. > > [2017-08-03 12:13:06,031] INFO Couldn't start HdfsSinkConnector: > (io.confluent.connect.hdfs.HdfsSinkTask) > org.apache.kafka.connect.errors.ConnectException: java.io.IOException: > Failed on local exception: > com.google.protobuf.InvalidProtocolBufferException: Protocol message > end-group tag did not match expected tag.; Host Details : local host is: > "7923902e26b4/172.17.0.3"; destination host is: "10.8.0.18":18039; > at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:202) > at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:76) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart( > WorkerSinkTask.java:231) > at > org.apache.kafka.connect.runtime.WorkerSinkTask. > execute(WorkerSinkTask.java:145) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Failed on local exception: > com.google.protobuf.InvalidProtocolBufferException: Protocol message > end-group tag did not match expected tag.; Host Details : local host is: > "7923902e26b4/172.17.0.3"; destination host is: "10.8.0.18":18039; > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776) > at org.apache.hadoop.ipc.Client.call(Client.java:1479) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker. > invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy49.getFileInfo(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat > orPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java: > 62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( > RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( > RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy50.getFileInfo(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22. > doCall(DistributedFileSystem.java:1305) > at > org.apache.hadoop.hdfs.DistributedFileSystem$22. > doCall(DistributedFileSystem.java:1301) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve( > FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus( > DistributedFileSystem.java:1317) > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) > at io.confluent.connect.hdfs.storage.HdfsStorage.exists( > HdfsStorage.java:66) > at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:368) > at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:170) > ... 10 more > Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol > message end-group tag did not match expected tag. > at > com.google.protobuf.InvalidProtocolBufferException.invalidEndTag( > InvalidProtocolBufferException.java:94) > at > com.google.protobuf.CodedInputStream.checkLastTagWas( > CodedInputStream.java:124) > at > com.google.protobuf.AbstractParser.parsePartialFrom( > AbstractParser.java:202) > at > com.google.protobuf.AbstractParser.parsePartialDelimitedFrom( > AbstractParser.java:241) > at > com.google.protobuf.AbstractParser.parseDelimitedFrom( > AbstractParser.java:253) > at > com.google.protobuf.AbstractParser.parseDelimitedFrom( > AbstractParser.java:259) > at > com.google.protobuf.AbstractParser.parseDelimitedFrom( > AbstractParser.java:49) > at > org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto. > parseDelimitedFrom(RpcHeaderProtos.java:3167) > at > org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse( > Client.java:1086) > at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979) > > It also throws anther exception while closing .. > > [2017-08-03 12:13:06,337] ERROR Task ks-hdfs-sink-0 threw an uncaught and > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) > java.lang.NullPointerException > at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:121) > at > org.apache.kafka.connect.runtime.WorkerSinkTask. > commitOffsets(WorkerSinkTask.java:317) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions( > WorkerSinkTask.java:480) > at > org.apache.kafka.connect.runtime.WorkerSinkTask. > execute(WorkerSinkTask.java:152) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > any pointer / help ? > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >