Hi,

Thank you for the response. I have made the suggested changes But now I am
getting "Caused by: java.lang.NoClassDefFoundError: scala/Product$class"
I am running my application on SpringBoot 2.0 version. Is there better
platform to run Flink Code?

Caused by: java.lang.NoClassDefFoundError: scala/Product$class
    at akka.util.Timeout.<init>(Timeout.scala:13)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:328)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:245)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:288)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.apply(ActorSystem.scala:263)
~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorSystem$.create(ActorSystem.scala:191)
~[akka-actor_2.11-2.4.20.jar:na]

On Tue, Mar 27, 2018 at 3:54 PM, Chesnay Schepler <ches...@apache.org>
wrote:

> Your anonymous ElasticsearchSinkFunction accesses the client variable that
> is defined outside of the function.
> For the function to be serializable, said Client must be as well.
>
> I suggest to turn your function into a named class with a constructor that
> accepts the indexName.
>
>
> On 27.03.2018 12:15, chandresh pancholi wrote:
>
> Flow
> Producer -> Kafka(Avro) -> Flink Connector with Avro deseriser -> FLink ->
> ES
> Kafka - Latest version
> Flink : 1.4.2
> ES: 5.5.2
>
> @Servicepublic class FlinkStream {
>
>     @Autowired    private ClientService clientService;
>
>     @Autowired    private AppConfig appConfig;
>
>     @PostConstruct    public void init() {
>         List<Client> clientList = clientService.getAllEnableTenant();
>         clientList.stream().forEach(client -> {
>             StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>             ConfluentAvroDeserializationSchema schema = new 
> ConfluentAvroDeserializationSchema(appConfig.getKafkaSchemaRegistry());
>             Properties properties = 
> buildKafkaConsumerProperties(client.getTopic());
>
>             FlinkKafkaConsumer011<String> flinkKafkaConsumer = new 
> FlinkKafkaConsumer011<String>(client.getTopic(), schema, properties);
>
>             DataStream<String> kafkaStream = 
> env.addSource(flinkKafkaConsumer);
>             writeTOEs(kafkaStream, client);
>             try {
>                 env.execute();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>         });
>
>
>     }
>
>     public Properties buildKafkaConsumerProperties(String clientTopic) {
>         Properties properties = new Properties();
>         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> appConfig.getKafkaBootstrapServers() );
>         properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
> appConfig.getKafkaFetchMinBytes() );
>         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
> appConfig.getKafkaAutoCommit());
>         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
> appConfig.getKafkaAutoCommitInterval());
>         properties.put("specific.avro.reader", true);
>         properties.put("schema.registry.url", 
> appConfig.getKafkaSchemaRegistry());
>         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> appConfig.getKafkaKeyDeserializer());
>         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> appConfig.getKafkaValueDeserializer());
>
>         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, clientTopic);
>
>         return properties;
>     }
>
>     public void writeTOEs(DataStream dataStream, Client client) {
>         HashMap<String, String> config = new HashMap<>();
>         config.put("bulk.flush.max.actions", "1");
>         config.put("cluster.name", appConfig.getElasticsearchCluster());
>
>         List<InetSocketAddress> transportAddresses = new ArrayList<>();
>         for (String tokenizedHost: 
> appConfig.getElasticsearchHost().split(",")) {
>             try {
>                 transportAddresses.add(new 
> InetSocketAddress(InetAddress.getByName(tokenizedHost), 9300));
>             } catch (UnknownHostException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         dataStream.addSink(new ElasticsearchSink<>(config, 
> transportAddresses, new ElasticsearchSinkFunction<String>() {
>             public IndexRequest createIndexRequest(String element) {
>                 Map<String, String> json = new HashMap<>();
>                 json.put("data", element);
>
>                 return Requests.indexRequest()
>                         .index(client.getIndexName())
>                         .type(client.getIndexName() + "-type")
>                         .source(json);
>             }
>
>             @Override            public void process(String element, 
> RuntimeContext ctx, RequestIndexer indexer) {
>                 indexer.add(createIndexRequest(element));
>             }
>         }));
>
>     }
> }
>
>
>
>
> --
> Chandresh Pancholi
> Senior Software Engineer
> Flipkart.com
> Email-id:chandresh.panch...@flipkart.com
> Contact:08951803660
>
>
>


-- 
Chandresh Pancholi
Senior Software Engineer
Flipkart.com
Email-id:chandresh.panch...@flipkart.com
Contact:08951803660

Reply via email to