after i install x-pack in my elasticsearch cluster and the elasticsearch cluster with basicauth
the elasticsearch sink can't connect to elastic cluster

code like:

DataStream<Tuple2<Boolean, Row>> esSink27 = tableEnv13.toRetractStream(esTable26, Row.class).filter( tuple -> tuple.f0);
//generate user config map
java.util.Map<String, String> userConfigMap22 = com.google.common.collect.Maps.newHashMap();
userConfigMap22.put("cluster.name", "test-magina");
userConfigMap22.put("bulk.flush.max.actions", "1");
//userConfigMap22.put("shield.user", "elastic:magina1001password");

//generate transports list
Splitter commaSplitter24 = Splitter.on(",");
Splitter colonSplitter25 = Splitter.on(":");
List<InetSocketAddress> transportsList23 = Lists.newArrayList();
for (String transport : commaSplitter24.split("101.206.91.118:9300")) {
    List<String> ipAndPort = colonSplitter25.splitToList(transport);
    transportsList23.add(new InetSocketAddress(InetAddress.getByName(ipAndPort.get(0)), Integer.parseInt(ipAndPort.get(1))));
}
esSink27.addSink(new ElasticsearchSink<Tuple2<Boolean, Row>>(userConfigMap22, transportsList23, new MaginaES5SinkFunction(esTable26.getSchema().getColumnNames(), "userid", "test-au", "test-au", "action,num"), new RetryRejectedExecutionFailureHandler())).name("elasticsearch_4068").setParallelism(2);

  • Any help will be greatly appreciated


Reply via email to