各位大佬好!
我在使用Flink ES连接器的时候,有时候报以下错误:
Caused by: java.io.IOException
breakpoint : 远程主机强迫关闭了一个现有的连接
初步判断,应该是没有维持住长连接保活,所以如果一段时间不写入数据,连接就断了。
请问各位大佬,ElasticSearch Connector 有什么参数可以维持长连接吗?
ElasticSearch Connector 代码如下:
jsonStringStream
.sinkTo(
new Elasticsearch7SinkBuilder<String>()
// Instructs the sink to emit after every Nth buffered
element
.setBulkFlushMaxActions(1)
.setHosts(
new HttpHost(
Conn.getInstance().getProp("elasticsearch.hosts"),
Integer.parseInt(
Conn.getInstance().getProp("elasticsearch.port")
),
Conn.getInstance().getProp("elasticsearch.scheme")
)
)
.setEmitter(
(page, context, indexer) ->
indexer.add(
new
IndexRequest(Conn.getInstance().getProp("elasticsearch.index.page"))
.source(page,
XContentType.JSON)
)
)
.setBulkFlushBackoffStrategy(FlushBackoffType.EXPONENTIAL, 5, 1000)
.build());
报错如下:
[cid:71c0dada-02b0-4a0d-b16f-97bb0d65167f]
多谢指教!