Flow Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink -> ES Kafka - Latest version Flink : 1.4.2 ES: 5.5.2
@Service public class FlinkStream { @Autowired private ClientService clientService; @Autowired private AppConfig appConfig; @PostConstruct public void init() { List<Client> clientList = clientService.getAllEnableTenant(); clientList.stream().forEach(client -> { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ConfluentAvroDeserializationSchema schema = new ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry()); Properties properties = buildKafkaConsumerProperties(client.getTopic()); FlinkKafkaConsumer011<String> flinkKafkaConsumer = new FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties); DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer); writeTOEs(kafkaStream, client); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }); } public Properties buildKafkaConsumerProperties(String clientTopic) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getKafkaBootstrapServers() ); properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, appConfig.getKafkaFetchMinBytes() ); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, appConfig.getKafkaAutoCommit()); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, appConfig.getKafkaAutoCommitInterval()); properties.put("specific.avro.reader", true); properties.put("schema.registry.url", appConfig.getKafkaSchemaRegistry()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaKeyDeserializer()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, appConfig.getKafkaValueDeserializer()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic); return properties; } public void writeTOEs(DataStream dataStream, Client client) { HashMap<String, String> config = new HashMap<>(); config.put("bulk.flush.max.actions", "1"); config.put("cluster.name", appConfig.getElasticsearchCluster()); List<InetSocketAddress> transportAddresses = new ArrayList<>(); for (String tokenizedHost: appConfig.getElasticsearchHost().split(",")) { try { transportAddresses.add(new InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300)); } catch (UnknownHostException e) { e.printStackTrace(); } } dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() { public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); json.put("data", element); return Requests.indexRequest() .index(client.getIndexName()) .type(client.getIndexName() + "-type") .source(json); } @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } })); } } -- Chandresh Pancholi Senior Software Engineer Flipkart.com Email-id:chandresh.panch...@flipkart.com Contact:08951803660