Maybe wrapping Jedis with a serializable class will do the trick?

But in general is there a way to reference jar classes  in flink apps without 
serializable them?


> On Sep 4, 2015, at 1:36 PM, Jerry Peng <jerry.boyang.p...@gmail.com> wrote:
> 
> Hello,
> 
> So I am trying to use jedis (redis java client) with Flink streaming api, but 
> I get an exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>       at org.apache.flink.client.program.Client.run(Client.java:278)
>       at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Object 
> flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not 
> serializable
>       at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>       at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320)
>       at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144)
>       at 
> org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624)
>       at 
> flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:483)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>       ... 6 more
> Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306)
>       at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>       ... 16 more
> 
> 
> 
> 
> so my code I am using: 
> 
> public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, 
> String,String,String,String>
>     , Tuple6<String, String,String,String,String,String>> {
>  private Jedis jedis;
>  private HashMap<String, String> ad_to_campaign;
> 
>  public RedisJoinBolt(String jedisServer) {
>   //initialize jedis
>   this.jedis = new Jedis(jedisServer);
>  }
> 
>  @Override
>  public void flatMap(Tuple5<String,String,String,String,String> input,
>            Collector<Tuple6<String,String,String,String,String,String>> out) 
> throws Exception {
> .
> .
> .
> 
> Any one know a fix for this?

Reply via email to