shouldn't be better to have both connectors for ES?one for 1.x and another for 2.x? On 4 Dec 2015 20:55, "Madhukar Thota" <madhukar.th...@gmail.com> wrote:
> Sure. I can submit the pull request. > > On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels <m...@apache.org> > wrote: > >> Hi Madhu, >> >> Great. Do you want to contribute it back via a GitHub pull request? If >> not that's also fine. We will try look into the 2.0 connector next >> week. >> >> Best, >> Max >> >> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota <madhukar.th...@gmail.com> >> wrote: >> > i have created working connector for Elasticsearch 2.0 based on >> > elasticsearch-flink connector. I am using it right now but i want >> official >> > connector from flink. >> > >> > ElasticsearchSink.java >> > >> > >> > import org.apache.flink.api.java.utils.ParameterTool; >> > import org.apache.flink.configuration.Configuration; >> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; >> > import org.slf4j.Logger; >> > import org.slf4j.LoggerFactory; >> > >> > import java.net.InetAddress; >> > import java.net.UnknownHostException; >> > import java.util.List; >> > import java.util.Map; >> > import java.util.concurrent.atomic.AtomicBoolean; >> > import java.util.concurrent.atomic.AtomicReference; >> > >> > import org.elasticsearch.action.bulk.BulkItemResponse; >> > import org.elasticsearch.action.bulk.BulkProcessor; >> > import org.elasticsearch.action.bulk.BulkRequest; >> > import org.elasticsearch.action.bulk.BulkResponse; >> > import org.elasticsearch.action.index.IndexRequest; >> > import org.elasticsearch.client.Client; >> > import org.elasticsearch.client.transport.TransportClient; >> > import org.elasticsearch.cluster.node.DiscoveryNode; >> > import org.elasticsearch.common.settings.Settings; >> > import org.elasticsearch.common.transport.InetSocketTransportAddress; >> > import org.elasticsearch.common.unit.ByteSizeUnit; >> > import org.elasticsearch.common.unit.ByteSizeValue; >> > import org.elasticsearch.common.unit.TimeValue; >> > >> > >> > public class ElasticsearchSink<T> extends RichSinkFunction<T> { >> > >> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = >> > "bulk.flush.max.actions"; >> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = >> > "bulk.flush.max.size.mb"; >> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = >> > "bulk.flush.interval.ms"; >> > >> > private static final long serialVersionUID = 1L; >> > private static final int DEFAULT_PORT = 9300; >> > private static final Logger LOG = >> > LoggerFactory.getLogger(ElasticsearchSink.class); >> > >> > /** >> > * The user specified config map that we forward to Elasticsearch >> when >> > we create the Client. >> > */ >> > private final Map<String, String> userConfig; >> > >> > /** >> > * The builder that is used to construct an {@link IndexRequest} >> from >> > the incoming element. >> > */ >> > private final IndexRequestBuilder<T> indexRequestBuilder; >> > >> > /** >> > * The Client that was either retrieved from a Node or is a >> > TransportClient. >> > */ >> > private transient Client client; >> > >> > /** >> > * Bulk processor that was created using the client >> > */ >> > private transient BulkProcessor bulkProcessor; >> > >> > /** >> > * This is set from inside the BulkProcessor listener if there where >> > failures in processing. >> > */ >> > private final AtomicBoolean hasFailure = new AtomicBoolean(false); >> > >> > /** >> > * This is set from inside the BulkProcessor listener if a >> Throwable was >> > thrown during processing. >> > */ >> > private final AtomicReference<Throwable> failureThrowable = new >> > AtomicReference<Throwable>(); >> > >> > public ElasticsearchSink(Map<String, String> userConfig, >> > IndexRequestBuilder<T> indexRequestBuilder) { >> > this.userConfig = userConfig; >> > this.indexRequestBuilder = indexRequestBuilder; >> > } >> > >> > >> > @Override >> > public void open(Configuration configuration) { >> > >> > ParameterTool params = ParameterTool.fromMap(userConfig); >> > Settings settings = Settings.settingsBuilder() >> > .put(userConfig) >> > .build(); >> > >> > TransportClient transportClient = >> > TransportClient.builder().settings(settings).build(); >> > for (String server : params.get("esHost").split(";")) >> > { >> > String[] components = server.trim().split(":"); >> > String host = components[0]; >> > int port = DEFAULT_PORT; >> > if (components.length > 1) >> > { >> > port = Integer.parseInt(components[1]); >> > } >> > >> > try { >> > transportClient = >> transportClient.addTransportAddress(new >> > InetSocketTransportAddress(InetAddress.getByName(host), port)); >> > } catch (UnknownHostException e) { >> > e.printStackTrace(); >> > } >> > } >> > >> > List<DiscoveryNode> nodes = transportClient.connectedNodes(); >> > if (nodes.isEmpty()) { >> > throw new RuntimeException("Client is not connected to any >> > Elasticsearch nodes!"); >> > } else { >> > if (LOG.isDebugEnabled()) { >> > LOG.info("Connected to nodes: " + nodes.toString()); >> > } >> > } >> > client = transportClient; >> > >> > BulkProcessor.Builder bulkProcessorBuilder = >> BulkProcessor.builder( >> > client, >> > new BulkProcessor.Listener() { >> > public void beforeBulk(long executionId, >> > BulkRequest request) { >> > >> > } >> > >> > public void afterBulk(long executionId, >> > BulkRequest request, >> > BulkResponse response) { >> > if (response.hasFailures()) { >> > for (BulkItemResponse itemResp : >> > response.getItems()) { >> > if (itemResp.isFailed()) { >> > LOG.error("Failed to index document >> in >> > Elasticsearch: " + itemResp.getFailureMessage()); >> > >> failureThrowable.compareAndSet(null, new >> > RuntimeException(itemResp.getFailureMessage())); >> > } >> > } >> > hasFailure.set(true); >> > } >> > } >> > >> > public void afterBulk(long executionId, >> > BulkRequest request, >> > Throwable failure) { >> > LOG.error(failure.getMessage()); >> > failureThrowable.compareAndSet(null, failure); >> > hasFailure.set(true); >> > } >> > }); >> > >> > // This makes flush() blocking >> > bulkProcessorBuilder.setConcurrentRequests(0); >> > >> > >> > >> > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) { >> > >> > >> bulkProcessorBuilder.setBulkActions(params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)); >> > } >> > >> > if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) { >> > bulkProcessorBuilder.setBulkSize(new >> > ByteSizeValue(params.getInt( >> > CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB), >> ByteSizeUnit.MB)); >> > } >> > >> > if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) { >> > >> > >> bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueMillis(params.getInt(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS))); >> > } >> > >> > bulkProcessor = bulkProcessorBuilder.build(); >> > } >> > >> > >> > @Override >> > public void invoke(T element) { >> > IndexRequest indexRequest = >> > indexRequestBuilder.createIndexRequest(element, getRuntimeContext()); >> > >> > if (LOG.isDebugEnabled()) { >> > LOG.debug("Emitting IndexRequest: {}", indexRequest); >> > } >> > >> > bulkProcessor.add(indexRequest); >> > } >> > >> > @Override >> > public void close() { >> > if (bulkProcessor != null) { >> > bulkProcessor.close(); >> > bulkProcessor = null; >> > } >> > >> > if (client != null) { >> > client.close(); >> > } >> > >> > if (hasFailure.get()) { >> > Throwable cause = failureThrowable.get(); >> > if (cause != null) { >> > throw new RuntimeException("An error occured in >> > ElasticsearchSink.", cause); >> > } else { >> > throw new RuntimeException("An error occured in >> > ElasticsearchSink."); >> > >> > } >> > } >> > } >> > >> > } >> > >> > >> > In my Main Class: >> > >> > >> > Map<String, String> config = Maps.newHashMap(); >> > >> > //Elasticsearch Parameters >> > >> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, >> > parameter.get("elasticsearch.bulk.flush.max.actions","1")); >> > config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, >> > parameter.get("elasticsearch.bulk.flush.interval.ms","2")); >> > config.put("cluster.name", parameter.get("elasticsearch.cluster.name >> ")); >> > config.put("esHost", parameter.get("elasticsearch.server", >> > "localhost:9300")); >> > >> > >> > DataStreamSink<String> elastic = messageStream.rebalance().addSink(new >> > ElasticsearchSink<>(config, (IndexRequestBuilder<String>) (element, >> > runtimeContext) -> { >> > String[] line = element.toLowerCase().split(" >> > +(?=(?:([^\"]*\"){2})*[^\"]*$)"); >> > String measureAndTags = line[0]; >> > String[] kvSplit = line[1].split("="); >> > String fieldName = kvSplit[0]; >> > String fieldValue = kvSplit[1]; >> > Map<String, String> tags = new HashMap<>(); >> > String measure = parseMeasureAndTags(measureAndTags, tags); >> > long time = (long) (Double.valueOf(line[2]) / 1000000); >> > >> > Map<String, Object> test = new HashMap<>(); >> > DateFormat dateFormat = new >> > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZZ"); >> > dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); >> > >> > test.put(fieldName, setValue(fieldValue)); >> > test.put("tags", tags); >> > test.put("measurement", measure); >> > test.put("@timestamp", dateFormat.format(new Date(time))); >> > >> > return Requests.indexRequest() >> > .index("metrics") >> > .type("test") >> > .source(new Gson().toJson(test).toLowerCase()); >> > >> > >> > })); >> > >> > >> > -Madhu >> > >> > >> > On Fri, Dec 4, 2015 at 9:18 AM, Maximilian Michels <m...@apache.org> >> wrote: >> >> >> >> Hi Madhu, >> >> >> >> Not yet. The API has changed slightly. We'll add one very soon. In the >> >> meantime I've created an issue to keep track of the status: >> >> >> >> https://issues.apache.org/jira/browse/FLINK-3115 >> >> >> >> Thanks, >> >> Max >> >> >> >> On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thota >> >> <madhukar.th...@gmail.com> wrote: >> >> > is current elasticsearch-flink connector support elasticsearch 2.x >> >> > version? >> >> > >> >> > -Madhu >> > >> > >> > >