Hi, Thank you for the response. I have made the suggested changes But now I am getting "Caused by: java.lang.NoClassDefFoundError: scala/Product$class" I am running my application on SpringBoot 2.0 version. Is there better platform to run Flink Code?
Caused by: java.lang.NoClassDefFoundError: scala/Product$class at akka.util.Timeout.<init>(Timeout.scala:13) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:328) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.ActorSystem$.apply(ActorSystem.scala:245) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.ActorSystem$.apply(ActorSystem.scala:288) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.ActorSystem$.apply(ActorSystem.scala:263) ~[akka-actor_2.11-2.4.20.jar:na] at akka.actor.ActorSystem$.create(ActorSystem.scala:191) ~[akka-actor_2.11-2.4.20.jar:na] On Tue, Mar 27, 2018 at 3:54 PM, Chesnay Schepler <ches...@apache.org> wrote: > Your anonymous ElasticsearchSinkFunction accesses the client variable that > is defined outside of the function. > For the function to be serializable, said Client must be as well. > > I suggest to turn your function into a named class with a constructor that > accepts the indexName. > > > On 27.03.2018 12:15, chandresh pancholi wrote: > > Flow > Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink -> > ES > Kafka - Latest version > Flink : 1.4.2 > ES: 5.5.2 > > @Servicepublic class FlinkStream { > > @Autowired private ClientService clientService; > > @Autowired private AppConfig appConfig; > > @PostConstruct public void init() { > List<Client> clientList = clientService.getAllEnableTenant(); > clientList.stream().forEach(client -> { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > ConfluentAvroDeserializationSchema schema = new > ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry()); > Properties properties = > buildKafkaConsumerProperties(client.getTopic()); > > FlinkKafkaConsumer011<String> flinkKafkaConsumer = new > FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties); > > DataStream<String> kafkaStream = > env.addSource(flinkKafkaConsumer); > writeTOEs(kafkaStream, client); > try { > env.execute(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > > > } > > public Properties buildKafkaConsumerProperties(String clientTopic) { > Properties properties = new Properties(); > properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > appConfig.getKafkaBootstrapServers() ); > properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, > appConfig.getKafkaFetchMinBytes() ); > properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > appConfig.getKafkaAutoCommit()); > properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > appConfig.getKafkaAutoCommitInterval()); > properties.put("specific.avro.reader", true); > properties.put("schema.registry.url", > appConfig.getKafkaSchemaRegistry()); > properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > appConfig.getKafkaKeyDeserializer()); > properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > appConfig.getKafkaValueDeserializer()); > > properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic); > > return properties; > } > > public void writeTOEs(DataStream dataStream, Client client) { > HashMap<String, String> config = new HashMap<>(); > config.put("bulk.flush.max.actions", "1"); > config.put("cluster.name", appConfig.getElasticsearchCluster()); > > List<InetSocketAddress> transportAddresses = new ArrayList<>(); > for (String tokenizedHost: > appConfig.getElasticsearchHost().split(",")) { > try { > transportAddresses.add(new > InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300)); > } catch (UnknownHostException e) { > e.printStackTrace(); > } > } > > dataStream.addSink(new ElasticsearchSink<>(config, > transportAddresses, new ElasticsearchSinkFunction<String>() { > public IndexRequest createIndexRequest(String element) { > Map<String, String> json = new HashMap<>(); > json.put("data", element); > > return Requests.indexRequest() > .index(client.getIndexName()) > .type(client.getIndexName() + "-type") > .source(json); > } > > @Override public void process(String element, > RuntimeContext ctx, RequestIndexer indexer) { > indexer.add(createIndexRequest(element)); > } > })); > > } > } > > > > > -- > Chandresh Pancholi > Senior Software Engineer > Flipkart.com > Email-id:chandresh.panch...@flipkart.com > Contact:08951803660 > > > -- Chandresh Pancholi Senior Software Engineer Flipkart.com Email-id:chandresh.panch...@flipkart.com Contact:08951803660