Your anonymous ElasticsearchSinkFunction accesses the client variable
that is defined outside of the function.
For the function to be serializable, said Client must be as well.
I suggest to turn your function into a named class with a constructor
that accepts the indexName.
On 27.03.2018 12:15, chandresh pancholi wrote:
Flow
Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser ->
FLink -> ES
Kafka - Latest version
Flink : 1.4.2
ES: 5.5.2
@Service public class FlinkStream {
@Autowired private ClientServiceclientService;
@Autowired private AppConfigappConfig;
@PostConstruct public void init() {
List<Client> clientList =clientService.getAllEnableTenant();
clientList.stream().forEach(client -> {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
ConfluentAvroDeserializationSchema schema =new
ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
Properties properties =
buildKafkaConsumerProperties(client.getTopic());
FlinkKafkaConsumer011<String> flinkKafkaConsumer =new
FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);
DataStream<String> kafkaStream = env.addSource(flinkKafkaConsumer);
writeTOEs(kafkaStream, client);
try {
env.execute();
}catch (Exception e) {
e.printStackTrace();
}
});
}
public Properties buildKafkaConsumerProperties(String clientTopic) {
Properties properties =new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,appConfig.getKafkaBootstrapServers()
);
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,appConfig.getKafkaFetchMinBytes()
);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,appConfig.getKafkaAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,appConfig.getKafkaAutoCommitInterval());
properties.put("specific.avro.reader",true);
properties.put("schema.registry.url",appConfig.getKafkaSchemaRegistry());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaKeyDeserializer());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,appConfig.getKafkaValueDeserializer());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);
return properties;
}
public void writeTOEs(DataStream dataStream, Client client) {
HashMap<String, String> config =new HashMap<>();
config.put("bulk.flush.max.actions","1");
config.put("cluster.name
<http://cluster.name>",appConfig.getElasticsearchCluster());
List<InetSocketAddress> transportAddresses =new ArrayList<>();
for (String tokenizedHost:appConfig.getElasticsearchHost().split(","))
{
try {
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName(tokenizedHost),9300));
}catch (UnknownHostException e) {
e.printStackTrace();
}
}
dataStream.addSink(new ElasticsearchSink<>(config, transportAddresses,new
ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json =new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index(client.getIndexName())
.type(client.getIndexName() +"-type")
.source(json);
}
@Override public void process(String element, RuntimeContext ctx,
RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
}
}
*
*
--
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.panch...@flipkart.com
<mailto:email-id%3achandresh.panch...@flipkart.com>
Contact:08951803660