Hi,
createIndexRequest是否不是静态的,scala的话可以在object中声明该方法。
Lambda中访问非静态方法,并且外部类不是可序列化的,可能会导致lambda无法被序列化。
Best,
Jiabao
On 2023/12/12 07:53:53 李世钰 wrote:
> val result: ElasticsearchSink[String] = new Elasticsearch7SinkBuilder[String]
> // This instructs the sink to emit after every element, otherwise they would
> // be buffered
> .setBulkFlushMaxActions(1)
> .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
> .setEmitter(
> (element: String, context: SinkWriter.Context, indexer: RequestIndexer)
> =>
> indexer.add(createIndexRequest(element)))
> .build()
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: The elasticsearch emitter must be serializable.
>
> Caused by: java.lang.IllegalStateException: The elasticsearch emitter must be
> serializable.
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> at
> org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilderBase.setEmitter(ElasticsearchSinkBuilderBase.java:77)
> at
> org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder.setEmitter(Elasticsearch7SinkBuilder.java:63)