Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink ->
ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2

@Service
public class FlinkStream {

    @Autowired
    private ClientService clientService;

    @Autowired
    private AppConfig appConfig;

    @PostConstruct
    public void init() {
        List<Client> clientList = clientService.getAllEnableTenant();
        clientList.stream().forEach(client -> {
            StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

            ConfluentAvroDeserializationSchema schema = new
ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
            Properties properties =
buildKafkaConsumerProperties(client.getTopic());

            FlinkKafkaConsumer011<String> flinkKafkaConsumer = new
FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);

            DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
            writeTOEs(kafkaStream, client);
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });


    }

    public Properties buildKafkaConsumerProperties(String clientTopic) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
appConfig.getKafkaBootstrapServers() );
        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
appConfig.getKafkaFetchMinBytes() );
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
appConfig.getKafkaAutoCommit());
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
appConfig.getKafkaAutoCommitInterval());
        properties.put("specific.avro.reader", true);
        properties.put("schema.registry.url",
appConfig.getKafkaSchemaRegistry());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
appConfig.getKafkaKeyDeserializer());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
appConfig.getKafkaValueDeserializer());

        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);

        return properties;
    }

    public void writeTOEs(DataStream dataStream, Client client) {
        HashMap<String, String> config = new HashMap<>();
        config.put("bulk.flush.max.actions", "1");
        config.put("cluster.name", appConfig.getElasticsearchCluster());

        List<InetSocketAddress> transportAddresses = new ArrayList<>();
        for (String tokenizedHost:
appConfig.getElasticsearchHost().split(",")) {
            try {
                transportAddresses.add(new
InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }

        dataStream.addSink(new ElasticsearchSink<>(config,
transportAddresses, new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                Map<String, String> json = new HashMap<>();
                json.put("data", element);

                return Requests.indexRequest()
                        .index(client.getIndexName())
                        .type(client.getIndexName() + "-type")
                        .source(json);
            }

            @Override
            public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        }));

    }
}




-- 
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.panch...@flipkart.com
Contact:08951803660

Reply via email to