Hi,

I have added netty-all 4.0 as dependency now it's working fine. Only thing
I had to create POJO class ion scala like this.

@SerialVersionUID(507L)
@Table(keyspace = "twtt", name = "order")
class OrderFinal(
                       @BeanProperty var name: String,
                       @BeanProperty var userEmail: String)extends
Serializable
{
  def this() {
    this("NA", "NA")
  }
}


If I am removing @BeanProperty or converting var to Val. It's giving error
of no setters or getters found or multiple found. This is the final
workaround i found.



‌

On Tue, Dec 19, 2017 at 6:11 PM, shashank agarwal <shashank...@gmail.com>
wrote:

> I have tried that by creating class with companion static object:
>
> @SerialVersionUID(507L)
> @Table(keyspace = "neofp", name = "order_detail")
> class OrderFinal(
>                        @BeanProperty var order_name: String,
>                        @BeanProperty var user: String )extends Serializable
> {
>   def this() {
>     this("NA", "NA",)
>   }
> }
> object OrderFinal
> {
>
> }
>
>
>
> When running with 1.4.0 it's giving following error :
>
>
> java.lang.NoClassDefFoundError: Could not initialize class
> com.datastax.driver.core.NettyUtil
> at com.datastax.driver.core.NettyOptions.eventLoopGroup(
> NettyOptions.java:96)
> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(
> CassandraSinkBase.java:88)
> at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(
> CassandraPojoSink.java:64)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.api.operators.StreamSink.open(
> StreamSink.java:48)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> 12/19/2017 18:04:33 Job execution switched to status FAILING.
> java.lang.NoClassDefFoundError: Could not initialize class
> com.datastax.driver.core.NettyUtil
> at com.datastax.driver.core.NettyOptions.eventLoopGroup(
> NettyOptions.java:96)
> at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:706)
> at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1365)
> at com.datastax.driver.core.Cluster.init(Cluster.java:162)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
> at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
> at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
> at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(
> CassandraSinkBase.java:88)
> at org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(
> CassandraPojoSink.java:64)
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.api.operators.StreamSink.open(
> StreamSink.java:48)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> On Tue, Dec 19, 2017 at 3:34 PM, shashank agarwal <shashank...@gmail.com>
> wrote:
>
>> HI,
>>
>> I have upgraded flink from 1.3.2 to 1.4.0. I am using cassandra sink in
>> my scala application. Before sink, i was converting my scala datastream to
>> java stream and sinking in Cassandra. I have created pojo class in scala
>> liked that :
>>
>> @SerialVersionUID(507L)
>> @Table(keyspace = "neofp", name = "order_detail")
>> case class OrderFinal(
>>                        @BeanProperty var order_name: String,
>>                        @BeanProperty var user: String )extends
>> Serializable
>> {
>>   def this() {
>>     this("NA", "NA",)
>>   }
>> }
>>
>> and this was working fine with sink. after upgrading to 1.4.0 it's giving
>> error "Query must not be null or empty."
>>
>> After dig into the CassandraSink code, I have found it's treating it as
>> case class and running CassandraScalaProductSinkBuilder which
>> do sanityCheck of query existence.
>>
>> So how I can create POJO class in scala so CassandraSink treats it
>> as CassandraPojoSinkBuilder?
>>
>> For workaround now I have downgraded the only connector to 1.3.2
>>
>>
>> Thanks
>> Shashank
>>
>>
>
>
> --
> Thanks Regards
>
> SHASHANK AGARWAL
>  ---  Trying to mobilize the things....
>
>


-- 
Thanks Regards

SHASHANK AGARWAL
 ---  Trying to mobilize the things....

Reply via email to