Maybe wrapping Jedis with a serializable class will do the trick? But in general is there a way to reference jar classes in flink apps without serializable them?
> On Sep 4, 2015, at 1:36 PM, Jerry Peng <jerry.boyang.p...@gmail.com> wrote: > > Hello, > > So I am trying to use jedis (redis java client) with Flink streaming api, but > I get an exception: > > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:278) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004) > Caused by: org.apache.flink.api.common.InvalidProgramException: Object > flink.benchmark.AdvertisingTopologyNative$RedisJoinBolt@21e360a not > serializable > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1320) > at > org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:144) > at > org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:624) > at > flink.benchmark.AdvertisingTopologyNative.main(AdvertisingTopologyNative.java:50) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > ... 6 more > Caused by: java.io.NotSerializableException: redis.clients.jedis.Jedis > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:306) > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) > ... 16 more > > > > > so my code I am using: > > public static class RedisJoinBolt implements FlatMapFunction<Tuple5<String, > String,String,String,String> > , Tuple6<String, String,String,String,String,String>> { > private Jedis jedis; > private HashMap<String, String> ad_to_campaign; > > public RedisJoinBolt(String jedisServer) { > //initialize jedis > this.jedis = new Jedis(jedisServer); > } > > @Override > public void flatMap(Tuple5<String,String,String,String,String> input, > Collector<Tuple6<String,String,String,String,String,String>> out) > throws Exception { > . > . > . > > Any one know a fix for this?