i have created working connector for Elasticsearch 2.0 based on elasticsearch-flink connector. I am using it right now but i want official connector from flink.
ElasticsearchSink.java import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; public class ElasticsearchSink<T> extends RichSinkFunction<T> { public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions"; public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb"; public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms"; private static final long serialVersionUID = 1L; private static final int DEFAULT_PORT = 9300; private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSink.class); /** * The user specified config map that we forward to Elasticsearch when we create the Client. */ private final Map<String, String> userConfig; /** * The builder that is used to construct an {@link IndexRequest} from the incoming element. */ private final IndexRequestBuilder<T> indexRequestBuilder; /** * The Client that was either retrieved from a Node or is a TransportClient. */ private transient Client client; /** * Bulk processor that was created using the client */ private transient BulkProcessor bulkProcessor; /** * This is set from inside the BulkProcessor listener if there where failures in processing. */ private final AtomicBoolean hasFailure = new AtomicBoolean(false); /** * This is set from inside the BulkProcessor listener if a Throwable was thrown during processing. */ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<Throwable>(); public ElasticsearchSink(Map<String, String> userConfig, IndexRequestBuilder<T> indexRequestBuilder) { this.userConfig = userConfig; this.indexRequestBuilder = indexRequestBuilder; } @Override public void open(Configuration configuration) { ParameterTool params = ParameterTool.fromMap(userConfig); Settings settings = Settings.settingsBuilder() .put(userConfig) .build(); TransportClient transportClient = TransportClient.builder().settings(settings).build(); for (String server : params.get("esHost").split(";")) { String[] components = server.trim().split(":"); String host = components[0]; int port = DEFAULT_PORT; if (components.length > 1) { port = Integer.parseInt(components[1]); } try { transportClient = transportClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); } catch (UnknownHostException e) { e.printStackTrace(); } } List<DiscoveryNode> nodes = transportClient.connectedNodes(); if (nodes.isEmpty()) { throw new RuntimeException("Client is not connected to any Elasticsearch nodes!"); } else { if (LOG.isDebugEnabled()) { LOG.info("Connected to nodes: " + nodes.toString()); } } client = transportClient; BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( client, new BulkProcessor.Listener() { public void beforeBulk(long executionId, BulkRequest request) { } public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { for (BulkItemResponse itemResp : response.getItems()) { if (itemResp.isFailed()) { LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage()); failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage())); } } hasFailure.set(true); } } public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error(failure.getMessage()); failureThrowable.compareAndSet(null, failure); hasFailure.set(true); } }); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); } if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { bulkProcessorBuilder.setBulkSize(new ByteSizeValue(params.getInt( CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), ByteSizeUnit.MB)); } if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); } bulkProcessor = bulkProcessorBuilder.build(); } @Override public void invoke(T element) { IndexRequest indexRequest = indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); if (LOG.isDebugEnabled()) { LOG.debug("Emitting IndexRequest: {}", indexRequest); } bulkProcessor.add(indexRequest); } @Override public void close() { if (bulkProcessor != null) { bulkProcessor.close(); bulkProcessor = null; } if (client != null) { client.close(); } if (hasFailure.get()) { Throwable cause = failureThrowable.get(); if (cause != null) { throw new RuntimeException("An error occured in ElasticsearchSink.", cause); } else { throw new RuntimeException("An error occured in ElasticsearchSink."); } } } } In my Main Class: Map<String, String> config = Maps.newHashMap(); //Elasticsearch Parameters config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, parameter.get("elasticsearch.bulk.flush.max.actions","1")); config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, parameter.get("elasticsearch.bulk.flush.interval.ms","2")); config.put("cluster.name", parameter.get("elasticsearch.cluster.name")); config.put("esHost", parameter.get("elasticsearch.server", "localhost:9300")); DataStreamSink<String> elastic = messageStream.rebalance().addSink(new ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element, runtimeContext) -> { String[] line = element.toLowerCase().split(" +(?=(?:([^\"]*\"){2})*[^\"]*$)"); String measureAndTags = line[0]; String[] kvSplit = line[1].split("="); String fieldName = kvSplit[0]; String fieldValue = kvSplit[1]; Map<String, String> tags = new HashMap<>(); String measure = parseMeasureAndTags(measureAndTags, tags); long time = (long) (Double.valueOf(line[2]) / 1000000); Map<String, Object> test = new HashMap<>(); DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); test.put(fieldName, setValue(fieldValue)); test.put("tags", tags); test.put("measurement", measure); test.put("@timestamp", dateFormat.format(new Date(time))); return Requests.indexRequest() .index("metrics") .type("test") .source(new Gson().toJson(test).toLowerCase()); })); -Madhu On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <m...@apache.org> wrote: > Hi Madhu, > > Not yet. The API has changed slightly. We'll add one very soon. In the > meantime I've created an issue to keep track of the status: > > https://issues.apache.org/jira/browse/FLINK-3115 > > Thanks, > Max > > On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota > <madhukar.th...@gmail.com> wrote: > > is current elasticsearch-flink connector support elasticsearch 2.x > version? > > > > -Madhu >