Hi Flink-Devs,

I have created an outputformat for elasticsearch, which connnects through
the transport client.
While running the job from eclipse-IDE, it works fine.
But, while running the job from command line or Flink web interface, i am
getting different error.
if i deploy the jar for first time, i am getting: 
java.lang.Exception: Configuring the OutputFormat
(de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat@1a20270e)
failed: JRE_IS_64BIT
        at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:80)
~[flink-runtime-0.8.1.jar:0.8.1]
        at
org.apache.flink.runtime.jobmanager.JobManager.submitJob(JobManager.java:385)
~[flink-runtime-0.8.1.jar:0.8.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_45]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_45]
        at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
[flink-runtime-0.8.1.jar:0.8.1]
        at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
[flink-runtime-0.8.1.jar:0.8.1]
Caused by: java.lang.NoSuchFieldError: JRE_IS_64BIT
        at
org.apache.lucene.util.RamUsageEstimator.<clinit>(RamUsageEstimator.java:145)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.apache.lucene.util.ArrayUtil.<clinit>(ArrayUtil.java:32)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.apache.lucene.util.BytesRefBuilder.grow(BytesRefBuilder.java:65)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.apache.lucene.util.BytesRefBuilder.copyChars(BytesRefBuilder.java:146)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.apache.lucene.util.BytesRefBuilder.copyChars(BytesRefBuilder.java:138)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.elasticsearch.common.Strings.toUTF8Bytes(Strings.java:1018)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.elasticsearch.common.Strings.toUTF8Bytes(Strings.java:1014)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.search.facet.filter.InternalFilterFacet.<clinit>(InternalFilterFacet.java:40)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.search.facet.TransportFacetModule.configure(TransportFacetModule.java:39)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.AbstractModule.configure(AbstractModule.java:60)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.spi.Elements$RecordingBinder.install(Elements.java:204)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.spi.Elements.getElements(Elements.java:85)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.InjectorShell$Builder.build(InjectorShell.java:130)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.InjectorBuilder.build(InjectorBuilder.java:99)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:93)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:70)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.ModulesBuilder.createInjector(ModulesBuilder.java:59)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:195)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:125)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat.configure(ESOutPutFormat.java:87)
~[de.fraunhofer.fokus.odp.transformer.flink.iee2rdf_1.0.0.201511012145.jar:na]
        at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:77)
~[flink-runtime-0.8.1.jar:0.8.1]
        ... 7 common frames omitted


if i run the same jar again, it shows:

java.lang.Exception: Configuring the OutputFormat
(de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat@1a20270e)
failed: Could not initialize class
org.elasticsearch.search.facet.filter.InternalFilterFacet
        at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:80)
~[flink-runtime-0.8.1.jar:0.8.1]
        at
org.apache.flink.runtime.jobmanager.JobManager.submitJob(JobManager.java:385)
~[flink-runtime-0.8.1.jar:0.8.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_45]
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_45]
        at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
[flink-runtime-0.8.1.jar:0.8.1]
        at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
[flink-runtime-0.8.1.jar:0.8.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.elasticsearch.search.facet.filter.InternalFilterFacet
        at
org.elasticsearch.search.facet.TransportFacetModule.configure(TransportFacetModule.java:39)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.AbstractModule.configure(AbstractModule.java:60)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.spi.Elements$RecordingBinder.install(Elements.java:204)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.spi.Elements.getElements(Elements.java:85)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.InjectorShell$Builder.build(InjectorShell.java:130)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.InjectorBuilder.build(InjectorBuilder.java:99)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:93)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:70)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.common.inject.ModulesBuilder.createInjector(ModulesBuilder.java:59)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:195)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:125)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
        at
de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat.configure(ESOutPutFormat.java:87)
~[de.fraunhofer.fokus.odp.transformer.flink.iee2rdf_1.0.0.201511012145.jar:na]
        at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:77)
~[flink-runtime-0.8.1.jar:0.8.1]
        ... 7 common frames omitted



My ESOutPutFormat looks like the below.

 
        /*
         * (non-Javadoc)
         * @see org.apache.flink.api.common.io.OutputFormat#close()
         */
        @Override
        public void close() throws IOException {
                // TODO Auto-generated method stub
                try {
                        client.close();
                        
                } catch (Exception e) {
                        e.printStackTrace();
                }
                System.out.println("DONE ES CLuster updated");;
        }
        
        /*
         * (non-Javadoc)
         * @see
org.apache.flink.api.common.io.OutputFormat#configure(org.apache.flink.configuration.Configuration)
         */
        @Override
        public void configure(Configuration arg0) {
                
                
                try {
                        
                        Settings settings =
ImmutableSettings.settingsBuilder().put("cluster.name",
"elasticsearch").build();
                        
                        @SuppressWarnings("resource")
                        TransportClient transportClient = new 
TransportClient(settings);//Error
OCCURED HERE
                        transportClient = ((TransportClient)
transportClient).addTransportAddress(new
InetSocketTransportAddress("localhost", 9300));
                        
                        this.client = (Client) transportClient;
                        
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
        
        /*
         * (non-Javadoc)
         * @see org.apache.flink.api.common.io.OutputFormat#open(int, int)
         */
        @Override
        public void open(int arg0, int arg1) throws IOException {
                try {
                        
                } catch (Exception e) {
                        // TODO: handle exception
                }
        }
        
        /*
         * (non-Javadoc)
         * @see
org.apache.flink.api.common.io.OutputFormat#writeRecord(java.lang.Object)
         */
        @Override
        public void writeRecord(Tuple2<String, String> input) throws 
IOException {
                System.out.println("  ES CLuster Indexing processing...");;
                try {
                        //Writing records
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }



Thanks and Regards,
Santosh



--
View this message in context: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/job-failed-while-initiating-Transport-client-for-Elasticsearch-tp8833.html
Sent from the Apache Flink Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to