Github user fpompermaier commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2790#discussion_r88566350
  
    --- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/helper/ElasticSearchHelper.java
 ---
    @@ -0,0 +1,185 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
    + * agreements. See the NOTICE file distributed with this work for 
additional information regarding
    + * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance with the 
License. You may obtain a
    + * copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software 
distributed under the License
    + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 
ANY KIND, either express
    + * or implied. See the License for the specific language governing 
permissions and limitations under
    + * the License.
    + */
    +package org.apache.flink.streaming.connectors.elasticsearch2.helper;
    +
    +import com.google.common.collect.ImmutableList;
    +
    +import 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink;
    +import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
    +import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
    +import org.elasticsearch.action.search.SearchResponse;
    +import org.elasticsearch.client.Client;
    +import org.elasticsearch.client.transport.TransportClient;
    +import org.elasticsearch.cluster.node.DiscoveryNode;
    +import org.elasticsearch.common.settings.Settings;
    +import org.elasticsearch.common.transport.InetSocketTransportAddress;
    +import org.elasticsearch.common.transport.TransportAddress;
    +import org.elasticsearch.index.IndexNotFoundException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * 
    + * This class manages the creation of index templates and index mapping on 
elasticsearch.
    + * 
    + * <p>
    + * Example:
    + *
    + * <pre>{@code
    + *                         ElasticSearchHelper esHelper = new 
ElasticSearchHelper(config, transports);
    + *
    + *                         //Create an Index Template given a name and the 
json structure
    + *                                 esHelper.initTemplate(templateName, 
templateRequest);
    + * 
    + *                                 //Create an Index Mapping given the 
Index Name, DocType and the json structure
    + *                                 esHelper.initIndexMapping(indexName, 
docType, mappingsRequest);
    + *
    + * }</pre>
    + * 
    + * <p>
    + * The {@link Map} passed to the constructor is forwarded to Elasticsearch 
when
    + * creating {@link TransportClient}. The config keys can be found in the
    + * Elasticsearch documentation. An important setting is {@code 
cluster.name},
    + * this should be set to the name of the cluster that the sink should emit 
to.
    + *
    + */
    +public class ElasticSearchHelper {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchSink.class);
    +
    +   private Client client;
    +   
    +   private final static int DEFAULT_INDEX_SHARDS = 2;
    +   private final static int DEFAULT_INDEX_REPLICAS = 0;
    +
    +   /**
    +    * Creates a new ElasticSearchHelper that connects to the cluster using 
a TransportClient.
    +    *
    +    * @param userConfig The map of user settings that are passed when 
constructing the TransportClients
    +    * @param transportAddresses The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
    +    */
    +   public ElasticSearchHelper(Map<String, String> userConfig, 
List<InetSocketAddress> transportAddresses) {
    +           client = buildElasticsearchClient(userConfig, 
transportAddresses);
    +   }
    +
    +   /**
    +    * Build a TransportClient to connect to the cluster.
    +    * 
    +    * @param userConfig The map of user settings that are passed when 
constructing the TransportClients
    +    * @param transportAddresses The Elasticsearch Nodes to which to 
connect using a {@code TransportClient}
    +    * @return Initialized TransportClient
    +    */
    +   public static Client buildElasticsearchClient(Map<String, String> 
userConfig,
    +                   List<InetSocketAddress> transportAddresses) {
    +           List<TransportAddress> transportNodes;
    +           transportNodes = new ArrayList<>(transportAddresses.size());
    +           for (InetSocketAddress address : transportAddresses) {
    +                   transportNodes.add(new 
InetSocketTransportAddress(address));
    +           }
    +
    +           Settings settings = 
Settings.settingsBuilder().put(userConfig).build();
    +
    +           TransportClient transportClient = 
TransportClient.builder().settings(settings).build();
    +           for (TransportAddress transport : transportNodes) {
    +                   transportClient.addTransportAddress(transport);
    +           }
    +
    +           // verify that we actually are connected to a cluster
    +           ImmutableList<DiscoveryNode> nodes = 
ImmutableList.copyOf(transportClient.connectedNodes());
    +           if (nodes.isEmpty()) {
    +                   throw new RuntimeException("Client is not connected to 
any Elasticsearch nodes!");
    +           }
    +           return transportClient;
    +   }
    +
    +   /**
    +    * Create a new index template.
    +    * 
    +    * @param templateName Name of the template to create
    +    * @param templateReq Json defining the index template
    +    */
    +   public void initTemplate(String templateName, String templateReq) 
throws Exception {
    +           // Check if the template is set
    +           if (templateReq != null && !templateReq.equals("")) {
    +                   // Json deserialization
    +                   PutIndexTemplateRequest request = new 
PutIndexTemplateRequest(templateName).source(templateReq);
    +                   // Sending the request to elastic search
    +                   sendIndexTemplateRequest(request);
    +           }
    +   }
    +
    +   /**
    +    * Send the index template request to elasticsearch.
    +    * 
    +    * @param indexTemplateRequest A valid index template request
    +    */
    +   public void sendIndexTemplateRequest(PutIndexTemplateRequest 
indexTemplateRequest) throws Exception {
    +           // Check if the template is set
    +           if (indexTemplateRequest != null) {
    +                   // Sending the request to elastic search
    +                   
client.admin().indices().putTemplate(indexTemplateRequest).get();
    +           }
    +   }
    +
    +   /**
    +    * Create a new mapping for a document type for an index.
    +    * 
    +    * @param indexName Index name where add the mapping
    +    * @param docType Document type of the mapping
    +    * @param mappingReq Json defining the index mapping
    +    */
    +   public void initIndexMapping(String indexName, String docType, String 
mappingReq) throws Exception {
    +           PutMappingRequest request=new 
PutMappingRequest(indexName).source(mappingReq).type(docType);
    +           
    +           // Put the mapping to the index
    +           sendIndexMappingRequest(request);
    +           LOG.debug("Updating mappings...");
    +   }
    +   
    +   /**
    +    * Send the index mapping request to elasticsearch.
    +    * 
    +    * @param mappingRequest A valid index mapping request
    +    */
    +   public void sendIndexMappingRequest(PutMappingRequest mappingRequest) 
throws Exception {
    +           // Check if the template is set
    +           if (mappingRequest != null) {
    +                   try {
    +                           // Check if the index exists
    +                           SearchResponse response = 
client.prepareSearch(mappingRequest.indices())
    +                                           
.setTypes(mappingRequest.type()).get();
    +                           if (response != null) {
    +                                   LOG.debug("Index found, no need to 
create it...");
    +                           }
    +                   } catch (IndexNotFoundException infe) {
    +                           for (String indexName:mappingRequest.indices()){
    +                                   // If the index does not exist, create 
it
    +                                   
client.admin().indices().prepareCreate(indexName)
    +                                                   
.setSettings(Settings.builder().put("index.number_of_shards", 
DEFAULT_INDEX_SHARDS)
    +                                                   
.put("index.number_of_replicas", DEFAULT_INDEX_REPLICAS)).execute().actionGet();
    --- End diff --
    
    Use elasticsearch constant for "index.number_of_replicas" => IndexMetaData. 
SETTING_NUMBER_OF_REPLICAS


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to