> shouldn't be better to have both connectors for ES?one for 1.x and another > for 2.x?
IMHO that's the way to go. Thanks Madhukar! Cheers, Max On Sat, Dec 5, 2015 at 6:49 AM, Deepak Sharma <deepakmc...@gmail.com> wrote: > Hi Madhu > Would you be able to provide the use case here in ElasticSearch with Flink? > > Thanks > Deepak > > On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thota <madhukar.th...@gmail.com> > wrote: > >> Sure. I can submit the pull request. >> >> On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <m...@apache.org> >> wrote: >> >>> Hi Madhu, >>> >>> Great. Do you want to contribute it back via a GitHub pull request? If >>> not that's also fine. We will try look into the 2.0 connector next >>> week. >>> >>> Best, >>> Max >>> >>> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.th...@gmail.com> >>> wrote: >>> > i have created working connector for Elasticsearch 2.0 based on >>> > elasticsearch-flink connector. I am using it right now but i want >>> official >>> > connector from flink. >>> > >>> > ElasticsearchSink.java >>> > >>> > >>> > import org.apache.flink.api.java.utils.ParameterTool; >>> > import org.apache.flink.configuration.Configuration; >>> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; >>> > import org.slf4j.Logger; >>> > import org.slf4j.LoggerFactory; >>> > >>> > import java.net.InetAddress; >>> > import java.net.UnknownHostException; >>> > import java.util.List; >>> > import java.util.Map; >>> > import java.util.concurrent.atomic.AtomicBoolean; >>> > import java.util.concurrent.atomic.AtomicReference; >>> > >>> > import org.elasticsearch.action.bulk.BulkItemResponse; >>> > import org.elasticsearch.action.bulk.BulkProcessor; >>> > import org.elasticsearch.action.bulk.BulkRequest; >>> > import org.elasticsearch.action.bulk.BulkResponse; >>> > import org.elasticsearch.action.index.IndexRequest; >>> > import org.elasticsearch.client.Client; >>> > import org.elasticsearch.client.transport.TransportClient; >>> > import org.elasticsearch.cluster.node.DiscoveryNode; >>> > import org.elasticsearch.common.settings.Settings; >>> > import org.elasticsearch.common.transport.InetSocketTransportAddress; >>> > import org.elasticsearch.common.unit.ByteSizeUnit; >>> > import org.elasticsearch.common.unit.ByteSizeValue; >>> > import org.elasticsearch.common.unit.TimeValue; >>> > >>> > >>> > public class ElasticsearchSink<T> extends RichSinkFunction<T> { >>> > >>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = >>> > "bulk.flush.max.actions"; >>> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = >>> > "bulk.flush.max.size.mb"; >>> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = >>> > "bulk.flush.interval.ms"; >>> > >>> > private static final long serialVersionUID = 1L; >>> > private static final int DEFAULT_PORT = 9300; >>> > private static final Logger LOG = >>> > LoggerFactory.getLogger(ElasticsearchSink.class); >>> > >>> > /** >>> > * The user specified config map that we forward to Elasticsearch >>> when >>> > we create the Client. >>> > */ >>> > private final Map<String, String> userConfig; >>> > >>> > /** >>> > * The builder that is used to construct an {@link IndexRequest} >>> from >>> > the incoming element. >>> > */ >>> > private final IndexRequestBuilder<T> indexRequestBuilder; >>> > >>> > /** >>> > * The Client that was either retrieved from a Node or is a >>> > TransportClient. >>> > */ >>> > private transient Client client; >>> > >>> > /** >>> > * Bulk processor that was created using the client >>> > */ >>> > private transient BulkProcessor bulkProcessor; >>> > >>> > /** >>> > * This is set from inside the BulkProcessor listener if there >>> where >>> > failures in processing. >>> > */ >>> > private final AtomicBoolean hasFailure = new AtomicBoolean(false); >>> > >>> > /** >>> > * This is set from inside the BulkProcessor listener if a >>> Throwable was >>> > thrown during processing. >>> > */ >>> > private final AtomicReference<Throwable> failureThrowable = new >>> > AtomicReference<Throwable>(); >>> > >>> > public ElasticsearchSink(Map<String, String> userConfig, >>> > IndexRequestBuilder<T> indexRequestBuilder) { >>> > this.userConfig = userConfig; >>> > this.indexRequestBuilder = indexRequestBuilder; >>> > } >>> > >>> > >>> > @Override >>> > public void open(Configuration configuration) { >>> > >>> > ParameterTool params = ParameterTool.fromMap(userConfig); >>> > Settings settings = Settings.settingsBuilder() >>> > .put(userConfig) >>> > .build(); >>> > >>> > TransportClient transportClient = >>> > TransportClient.builder().settings(settings).build(); >>> > for (String server : params.get("esHost").split(";")) >>> > { >>> > String[] components = server.trim().split(":"); >>> > String host = components[0]; >>> > int port = DEFAULT_PORT; >>> > if (components.length > 1) >>> > { >>> > port = Integer.parseInt(components[1]); >>> > } >>> > >>> > try { >>> > transportClient = >>> transportClient.addTransportAddress(new >>> > InetSocketTransportAddress(InetAddress.getByName(host), port)); >>> > } catch (UnknownHostException e) { >>> > e.printStackTrace(); >>> > } >>> > } >>> > >>> > List<DiscoveryNode> nodes = transportClient.connectedNodes(); >>> > if (nodes.isEmpty()) { >>> > throw new RuntimeException("Client is not connected to any >>> > Elasticsearch nodes!"); >>> > } else { >>> > if (LOG.isDebugEnabled()) { >>> > LOG.info("Connected to nodes: " + nodes.toString()); >>> > } >>> > } >>> > client = transportClient; >>> > >>> > BulkProcessor.Builder bulkProcessorBuilder = >>> BulkProcessor.builder( >>> > client, >>> > new BulkProcessor.Listener() { >>> > public void beforeBulk(long executionId, >>> > BulkRequest request) { >>> > >>> > } >>> > >>> > public void afterBulk(long executionId, >>> > BulkRequest request, >>> > BulkResponse response) { >>> > if (response.hasFailures()) { >>> > for (BulkItemResponse itemResp : >>> > response.getItems()) { >>> > if (itemResp.isFailed()) { >>> > LOG.error("Failed to index >>> document in >>> > Elasticsearch: " + itemResp.getFailureMessage()); >>> > >>> failureThrowable.compareAndSet(null, new >>> > RuntimeException(itemResp.getFailureMessage())); >>> > } >>> > } >>> > hasFailure.set(true); >>> > } >>> > } >>> > >>> > public void afterBulk(long executionId, >>> > BulkRequest request, >>> > Throwable failure) { >>> > LOG.error(failure.getMessage()); >>> > failureThrowable.compareAndSet(null, failure); >>> > hasFailure.set(true); >>> > } >>> > }); >>> > >>> > // This makes flush() blocking >>> > bulkProcessorBuilder.setConcurrentRequests(0); >>> > >>> > >>> > >>> > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { >>> > >>> > >>> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); >>> > } >>> > >>> > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { >>> > bulkProcessorBuilder.setBulkSize(new >>> > ByteSizeValue(params.getInt( >>> > CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), >>> ByteSizeUnit.MB)); >>> > } >>> > >>> > if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { >>> > >>> > >>> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); >>> > } >>> > >>> > bulkProcessor = bulkProcessorBuilder.build(); >>> > } >>> > >>> > >>> > @Override >>> > public void invoke(T element) { >>> > IndexRequest indexRequest = >>> > indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); >>> > >>> > if (LOG.isDebugEnabled()) { >>> > LOG.debug("Emitting IndexRequest: {}", indexRequest); >>> > } >>> > >>> > bulkProcessor.add(indexRequest); >>> > } >>> > >>> > @Override >>> > public void close() { >>> > if (bulkProcessor != null) { >>> > bulkProcessor.close(); >>> > bulkProcessor = null; >>> > } >>> > >>> > if (client != null) { >>> > client.close(); >>> > } >>> > >>> > if (hasFailure.get()) { >>> > Throwable cause = failureThrowable.get(); >>> > if (cause != null) { >>> > throw new RuntimeException("An error occured in >>> > ElasticsearchSink.", cause); >>> > } else { >>> > throw new RuntimeException("An error occured in >>> > ElasticsearchSink."); >>> > >>> > } >>> > } >>> > } >>> > >>> > } >>> > >>> > >>> > In my Main Class: >>> > >>> > >>> > Map<String, String> config = Maps.newHashMap(); >>> > >>> > //Elasticsearch Parameters >>> > >>> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, >>> > parameter.get("elasticsearch.bulk.flush.max.actions","1")); >>> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, >>> > parameter.get("elasticsearch.bulk.flush.interval.ms","2")); >>> > config.put("cluster.name", parameter.get("elasticsearch.cluster.name >>> ")); >>> > config.put("esHost", parameter.get("elasticsearch.server", >>> > "localhost:9300")); >>> > >>> > >>> > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new >>> > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element, >>> > runtimeContext) -> { >>> > String[] line = element.toLowerCase().split(" >>> > +(?=(?:([^\"]*\"){2})*[^\"]*$)"); >>> > String measureAndTags = line[0]; >>> > String[] kvSplit = line[1].split("="); >>> > String fieldName = kvSplit[0]; >>> > String fieldValue = kvSplit[1]; >>> > Map<String, String> tags = new HashMap<>(); >>> > String measure = parseMeasureAndTags(measureAndTags, tags); >>> > long time = (long) (Double.valueOf(line[2]) / 1000000); >>> > >>> > Map<String, Object> test = new HashMap<>(); >>> > DateFormat dateFormat = new >>> > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); >>> > dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); >>> > >>> > test.put(fieldName, setValue(fieldValue)); >>> > test.put("tags", tags); >>> > test.put("measurement", measure); >>> > test.put("@timestamp", dateFormat.format(new Date(time))); >>> > >>> > return Requests.indexRequest() >>> > .index("metrics") >>> > .type("test") >>> > .source(new Gson().toJson(test).toLowerCase()); >>> > >>> > >>> > })); >>> > >>> > >>> > -Madhu >>> > >>> > >>> > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <m...@apache.org> >>> wrote: >>> >> >>> >> Hi Madhu, >>> >> >>> >> Not yet. The API has changed slightly. We'll add one very soon. In the >>> >> meantime I've created an issue to keep track of the status: >>> >> >>> >> https://issues.apache.org/jira/browse/FLINK-3115 >>> >> >>> >> Thanks, >>> >> Max >>> >> >>> >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota >>> >> <madhukar.th...@gmail.com> wrote: >>> >> > is current elasticsearch-flink connector support elasticsearch 2.x >>> >> > version? >>> >> > >>> >> > -Madhu >>> > >>> > >>> >> >> > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net >