Eugene Burd created KAFKA-6139:
----------------------------------

             Summary: error when loading plugins
                 Key: KAFKA-6139
                 URL: https://issues.apache.org/jira/browse/KAFKA-6139
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 0.11.0.0
            Reporter: Eugene Burd


I am trying to run the Big Query connector for KafkaConnect 
(https://github.com/wepay/kafka-connect-bigquery).  I have configured it using 
a docker container, dropped the jar files for the connector in the container 
and configured the plugin.path in the kafka connect config to point to the 
directory with the jars.  

Upon startup, Kafka is scanning the plugin folder and evaluating all the jar 
files found.  It encounters the connector jar file, tests to see if it extends 
a connector (in this case kcbq-connector-1.0.0-SNAPSHOT.jar), but then it 
proceeds to create a new instance of the connector.  Code snippet from 
DelegatingClassLoader.java below.  The problem is that the connector class 
relies on other jar files that are in the plugin folder, but not in the path.  
This causes a class not found exception (below). 

I suspect when the plugin connector is actually executed, it is done through an 
isolated context and that context's classpath is set to the plugin folder, so i 
think the connector would actually work.  But scanning for the connector fails 
prior to ever getting there.

I understand that you need a version of the connector and hence why this code 
is running, but i think the new instance creation for version check needs to be 
done in a sandbox to support the classpaths of the plugin connector. 


[2017-10-28 06:04:08,961] INFO Loading plugin from: 
/usr/share/java/kafka-connect-bigquery/kcbq-connector-1.0.0-SNAPSHOT.jar 
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176)
Exception in thread "main" java.lang.NoClassDefFoundError: 
com/google/cloud/bigquery/BigQuery
        at java.lang.Class.getDeclaredConstructors0(Native Method)
        at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
        at java.lang.Class.getConstructor0(Class.java:3075)
        at java.lang.Class.newInstance(Class.java:412)
        at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:242)
        at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:223)
        at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
        at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:190)
        at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:150)
        at 
org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:47)
        at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)
Caused by: java.lang.ClassNotFoundException: com.google.cloud.bigquery.BigQuery
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at 
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:62)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 11 more

-------------------------

private <T> Collection<PluginDesc<T>> getPluginDesc( 
            Reflections reflections,
            Class<T> klass,
            ClassLoader loader
    ) throws InstantiationException, IllegalAccessException {
        Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass);

        Collection<PluginDesc<T>> result = new ArrayList<>();
        for (Class<? extends T> plugin : plugins) {
            if (PluginUtils.isConcrete(plugin)) {
                // Temporary workaround until all the plugins are versioned.
                if (Connector.class.isAssignableFrom(plugin)) {
                    result.add(
                            new PluginDesc<>(
                                    plugin,
                                    ((Connector) 
plugin.newInstance()).version(),
                                    loader
                            )
                    );
                } else {
                    result.add(new PluginDesc<>(plugin, "undefined", loader));
                }
            }
        }
        return result;
    }




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to