Hi Timo "You don't need to specify the type in .flatMap() explicitly. It will be automatically extracted using the generic signature of DataDataConverter.” It does not. That is the reason why I had to add it there
> Regarding your error. Make sure that you don't mix up the API classes. If you > want to use the Java API you should not use > "org.apache.flink.streaming.api.scala.DataStream" but the Java one. I rewrote the class in Java. Thats why I am so confused Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > > From: Timo Walther <twal...@apache.org> > Subject: Re: Java types > Date: January 11, 2018 at 3:07:08 AM CST > To: user@flink.apache.org > > > Hi Boris, > > each API is designed language-specific so they might not always be the same. > Scala has better type extraction features and let you write code very > precisely. Java requires sometime more code to archieve the same. > > You don't need to specify the type in .flatMap() explicitly. It will be > automatically extracted using the generic signature of DataDataConverter. > > Regarding your error. Make sure that you don't mix up the API classes. If you > want to use the Java API you should not use > "org.apache.flink.streaming.api.scala.DataStream" but the Java one. > > Regards, > Timo > > > > Am 1/11/18 um 5:13 AM schrieb Boris Lublinsky: >> More questions >> In Scala my DataProcessor is defined as >> class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, >> Double] with CheckpointedFunction { >> And it is used as follows >> val models = modelsStream.map(ModelToServe.fromByteArray(_)) >> .flatMap(BadDataHandler[ModelToServe]) >> .keyBy(_.dataType) >> val data = dataStream.map(DataRecord.fromByteArray(_)) >> .flatMap(BadDataHandler[WineRecord]) >> .keyBy(_.dataType) >> >> // Merge streams >> data >> .connect(models) >> .process(DataProcessorKeyed()) >> When I am doing the same thing in Java >> public class DataProcessorKeyed extends >> CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements >> CheckpointedFunction{ >> Which I am using as follows >> // Read data from streams >> DataStream<Tuple2<String, ModelToServe>> models = modelsStream >> .flatMap(new ModelDataConverter(), new >> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, >> TypeInformation.of(ModelToServe.class))) >> .keyBy(0); >> DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream >> .flatMap(new DataDataConverter(), new >> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, >> TypeInformation.of(Winerecord.WineRecord.class))) >> .keyBy(0); >> >> // Merge streams >> data >> .connect(models) >> .process(new DataProcessorKeyed()); >> I am getting an error >> >> Error:(68, 17) java: no suitable method found for keyBy(int) >> method >> org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>) >> is not applicable >> (argument mismatch; int cannot be converted to >> scala.collection.Seq<java.lang.Object>) >> method >> org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>) >> is not applicable >> (cannot infer type-variable(s) K >> (actual and formal argument lists differ in length)) >> So it assumes key/value pairs for the coprocessor >> >> Why is such difference between APIs? >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >> https://www.lightbend.com/ <https://www.lightbend.com/> >>> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <boris.lublin...@lightbend.com >>> <mailto:boris.lublin...@lightbend.com>> wrote: >>> >>> I am trying to covert Scala code (which works fine) to Java >>> The sacral code is: >>> // create a Kafka consumers >>> // Data >>> val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]]( >>> DATA_TOPIC, >>> new ByteArraySchema, >>> dataKafkaProps >>> ) >>> >>> // Model >>> val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]]( >>> MODELS_TOPIC, >>> new ByteArraySchema, >>> modelKafkaProps >>> ) >>> >>> // Create input data streams >>> val modelsStream = env.addSource(modelConsumer) >>> val dataStream = env.addSource(dataConsumer) >>> >>> // Read data from streams >>> val models = modelsStream.map(ModelToServe.fromByteArray(_)) >>> .flatMap(BadDataHandler[ModelToServe]) >>> .keyBy(_.dataType) >>> val data = dataStream.map(DataRecord.fromByteArray(_)) >>> .flatMap(BadDataHandler[WineRecord]) >>> .keyBy(_.dataType) >>> Now I am trying to re write it to Java and fighting with the requirement of >>> providing types, where they should be obvious >>> >>> // create a Kafka consumers >>> // Data >>> FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>( >>> ModelServingConfiguration.DATA_TOPIC, >>> new ByteArraySchema(), >>> dataKafkaProps); >>> >>> // Model >>> FlinkKafkaConsumer010<byte[]> modelConsumer = new FlinkKafkaConsumer010<>( >>> ModelServingConfiguration.MODELS_TOPIC, >>> new ByteArraySchema(), >>> modelKafkaProps); >>> >>> // Create input data streams >>> DataStream<byte[]> modelsStream = env.addSource(modelConsumer, >>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >>> DataStream<byte[]> dataStream = env.addSource(dataConsumer, >>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >>> // Read data from streams >>> DataStream<Tuple2<String,ModelToServe>> models = modelsStream >>> .flatMap(new ModelConverter(), new >>> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, >>> TypeInformation.of(ModelToServe.class))); >>> >>> Am I missing something similar to import org.apache.flink.api.scala._ >>> In java? >>> >>> Now if this is an only way, Does this seems right? >>> >>> Boris Lublinsky >>> FDP Architect >>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >>> https://www.lightbend.com/ <https://www.lightbend.com/> >>