Hi,

I am running flink 1.4 in single node. My job has two Kafka consumers reading 
from separate topics. After fetching the data, the job writes it to two 
separate Elasticsearch sinks. So the process is like this

KafkaTopic1 -> Kafkaconsumer1 -> create output record -> Elasticsearchsink1
KafkaTopic2 -> Kafkaconsumer2 -> create output record -> Elasticsearchsink2

Both the streams and their processing are completely unrelated. The first sink 
works as expected and it writes the output for all input records. The second 
sink writes to Elasticsearch only once and after that it stops writing to 
Elasticsearch even if there is more data that gets fed into Kafka. Sometimes, 
it does not even write once. We tested this in two other jobs and the same 
issue is there in all of them.

I have attached a sample code I had created to illustrate the issue. We are 
using Elasticsearch version 5.6.4 and hence the dependency used is 
'flink-connector-elasticsearch5_2.11'.

Regards,
Teena




public class ElasticSearchTest1 {

        public static void main(String[] args) throws Exception {
                
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                
                // set elasticsearch connection details 
                Map<String, String> config = new HashMap<>();
                config.put("bulk.flush.max.actions", "1");
                config.put("cluster.name", "<cluster name>");
                List<InetSocketAddress> transports = new ArrayList<>();         
                transports.add(new 
InetSocketAddress(InetAddress.getByName("<host ip>"), 9300));
                
                //Set properties for Kafka Streaming
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "<host 
ip>"+":9092");
                properties.setProperty("group.id", "testGroup");
                properties.setProperty("auto.offset.reset", "latest");  
                                
                //Create consumer for log records
                
                FlinkKafkaConsumer011 inputConsumer1 = new 
FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), 
properties);
                                
                DataStream<RecordOne> firstStream = env
                                .addSource(inputConsumer1)
                                .flatMap(new CreateRecordOne());
                        
                firstStream             
                .addSink(new ElasticsearchSink<RecordOne>(config, 
                                transports, 
                                new 
ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
                
                FlinkKafkaConsumer011 inputConsumer2 = new 
FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), 
properties);
                
                DataStream<RecordTwo> secondStream = env
                                        .addSource(inputConsumer2)              
                                        .flatMap(new CreateRecordTwo());
                        
                secondStream            
                .addSink(new ElasticsearchSink<RecordTwo>(config, 
                                transports, 
                                new 
ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
                                
                env.execute("Elastic Search Test");
        }
}

public class ElasticSearchOutputRecord implements 
ElasticsearchSinkFunction<RecordOne> {

        String index;
        String type;
    // Initialize filter function
    public ElasticSearchOutputRecord(String index, String type) {
        this.index = index;
        this.type = type;
    }
        // construct index request
        @Override
        public void process(
                        RecordOne record,
                RuntimeContext ctx,
                RequestIndexer indexer) {

                // construct JSON document to index
                Map<String, String> json = new HashMap<>();
                
                json.put("item_one", record.item1);      
                json.put("item_two", record.item2);      
                                                
                IndexRequest rqst = Requests.indexRequest()
                                .index(index)           // index name
                                .type(type)     // mapping name
                                .source(json);

                indexer.add(rqst);
        }
}

public class ElasticSearchOutputRecord2 implements 
ElasticsearchSinkFunction<RecordTwo> {

        String index;
        String type;
    // Initialize filter function
    public ElasticSearchOutputRecord2(String index, String type) {
        this.index = index;
        this.type = type;
    }
        // construct index request
        @Override
        public void process(
                        RecordTwo record,
                RuntimeContext ctx,
                RequestIndexer indexer) {

                // construct JSON document to index
                Map<String, String> json = new HashMap<>();
                
                json.put("item_three", record.item3);      
                json.put("item_four", record.item4);      
                                                
                IndexRequest rqst = Requests.indexRequest()
                                .index(index)           // index name
                                .type(type)     // mapping name
                                .source(json);

                indexer.add(rqst);
        }
}

public class CreateRecordOne implements FlatMapFunction<ObjectNode,RecordOne> {
        
        static final Logger log = 
LoggerFactory.getLogger(CreateRecordOne.class);
        
        @Override
        public void flatMap(ObjectNode value, Collector<RecordOne> out) throws 
Exception {
                try {
                        out.collect(new 
RecordOne(value.get("item1").asText(),value.get("item2").asText()));
                }
                catch(Exception e) {
                        log.error("error while creating RecordOne", e);
                }
        }

}

public class CreateRecordTwo implements FlatMapFunction<ObjectNode,RecordTwo> {
        
        static final Logger log = 
LoggerFactory.getLogger(CreateRecordTwo.class);
        
        @Override
        public void flatMap(ObjectNode value, Collector<RecordTwo> out) throws 
Exception {
                try {
                        out.collect(new 
RecordTwo(value.get("item1").asText(),value.get("item2").asText()));
                }
                catch(Exception e) {
                        log.error("error while creating RecordTwo", e);
                }
        }

}

public class RecordOne {        
        
        public String item1;    
        public String item2;    
                
        public RecordOne() {};
        
        public RecordOne (
                        
                        String item1,   
                        String item2    
                                                                        
                        ) {     
                
                                 this.item1 =   item1;
                                 this.item2 = item2;    
                                 
        }               
}

public class RecordTwo {        
        
        public String item3;    
        public String item4;    
                
        public RecordTwo() {};
        
        public RecordTwo (                      
                        String item3,   
                        String item4                                            
                        
                        ) {             
                                 this.item3 =   item3;
                                 this.item4 = item4;    
                                 
        }               
}

Reply via email to