Your anonymous ElasticsearchSinkFunction accesses the client variable that is defined outside of the function.
For the function to be serializable, said Client must be as well.

I suggest to turn your function into a named class with a constructor that accepts the indexName.

On 27.03.2018 12:15, chandresh pancholi wrote:
Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink -> ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2
@Service public class FlinkStream {

     @Autowired private ClientServiceclientService;

     @Autowired private AppConfigappConfig;

     @PostConstruct public void init() {
         List<Client> clientList =clientService.getAllEnableTenant();
         clientList.stream().forEach(client -> {
             StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

             ConfluentAvroDeserializationSchema schema =new 
ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
             Properties properties = 
buildKafkaConsumerProperties(client.getTopic());

             FlinkKafkaConsumer011<String> flinkKafkaConsumer =new 
FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);

             DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
             writeTOEs(kafkaStream, client);
             try {
                 env.execute();
             }catch (Exception e) {
                 e.printStackTrace();
             }
         });


     }

     public Properties buildKafkaConsumerProperties(String clientTopic) {
         Properties properties =new Properties();
         
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,appConfig.getKafkaBootstrapServers()
 );
         
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,appConfig.getKafkaFetchMinBytes()
 );
         
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,appConfig.getKafkaAutoCommit());
         
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,appConfig.getKafkaAutoCommitInterval());
         properties.put("specific.avro.reader",true);
         
properties.put("schema.registry.url",appConfig.getKafkaSchemaRegistry());
         
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaKeyDeserializer());
         
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaValueDeserializer());

         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);

         return properties;
     }

     public void writeTOEs(DataStream dataStream, Client client) {
         HashMap<String, String> config =new HashMap<>();
         config.put("bulk.flush.max.actions","1");
         config.put("cluster.name 
<http://cluster.name>",appConfig.getElasticsearchCluster());

         List<InetSocketAddress> transportAddresses =new ArrayList<>();
         for (String tokenizedHost:appConfig.getElasticsearchHost().split(",")) 
{
             try {
                 transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName(tokenizedHost),9300));
             }catch (UnknownHostException e) {
                 e.printStackTrace();
             }
         }

         dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses,new 
ElasticsearchSinkFunction<String>() {
             public IndexRequest createIndexRequest(String element) {
                 Map<String, String> json =new HashMap<>();
                 json.put("data", element);

                 return Requests.indexRequest()
                         .index(client.getIndexName())
                         .type(client.getIndexName() +"-type")
                         .source(json);
             }

             @Override public void process(String element, RuntimeContext ctx, 
RequestIndexer indexer) {
                 indexer.add(createIndexRequest(element));
             }
         }));

     }
}

*

*
--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.panch...@flipkart.com <mailto:email-id%3achandresh.panch...@flipkart.com>
Contact:08951803660


Reply via email to