BTW, My elastic search version is 2.3.3, not the jira FLINK-7133 by 1.7.1. And 
I found 2.3.3 is not based on asm. My flink version is 1.3.1. 


flink-connector-elasticsearch-base_2.10 version is 1.3.1
flink-connector-elasticsearch2_2.10 version is 1.3.1 also.






At 2017-08-13 21:54:06, "mingleizhang" <18717838...@163.com> wrote:

Hello, flink experts and friends!


It is my first time to write flink application in my company. But I met the 
following error when I used a elasticsearch as my sink. I searched the solution 
for it and found a jira https://issues.apache.org/jira/browse/FLINK-7133 . 
then, I added the PR to my code like the following. But when I ran the flink 
program again, error is still there. Why ? When I used a filesystem as a sink 
,nothing error happened, but when i returned to elasticsearch, error loves me. 
bad!  Could you help me please ? 


I think the two lines of code of Log is not relevant to the error. But I still 
put it here as a reference. It is just a PB object. 


21:41:09,397 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class 
com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo 
does not contain a setter for field unknownFields
21:41:09,400 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
 - class 
com.vip.data.cleaning.logic.mars.activity.info.ActivityInfoProtos$ActivityInfo 
is not a valid POJO type because not all fields are valid POJO fields.


Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source)
at org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.<init>(Unknown Source)
at 
org.apache.flink.api.scala.InnerClosureFinder.<init>(ClosureCleaner.scala:279)
at 
org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600)
at 
com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter$.main(WhileListFilter.scala:79)
at 
com.vip.data.cleaning.validation.mars.activity.info.WhileListFilter.main(WhileListFilter.scala)


PR here:
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch</artifactId>
  <version>${elasticsearch.version}</version>
  <exclusions>
    <exclusion>
      <groupId>org.ow2.asm</groupId>
      <artifactId>*</artifactId>
    </exclusion>
  </exclusions>
</dependency>


Regards
mingleizhang
rice.zh...@vipshop.com











 

Reply via email to