More questions In Scala my DataProcessor is defined as class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, Double] with CheckpointedFunction { And it is used as follows val models = modelsStream.map(ModelToServe.fromByteArray(_)) .flatMap(BadDataHandler[ModelToServe]) .keyBy(_.dataType) val data = dataStream.map(DataRecord.fromByteArray(_)) .flatMap(BadDataHandler[WineRecord]) .keyBy(_.dataType)
// Merge streams data .connect(models) .process(DataProcessorKeyed()) When I am doing the same thing in Java public class DataProcessorKeyed extends CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements CheckpointedFunction{ Which I am using as follows // Read data from streams DataStream<Tuple2<String, ModelToServe>> models = modelsStream .flatMap(new ModelDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class))) .keyBy(0); DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream .flatMap(new DataDataConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(Winerecord.WineRecord.class))) .keyBy(0); // Merge streams data .connect(models) .process(new DataProcessorKeyed()); I am getting an error Error:(68, 17) java: no suitable method found for keyBy(int) method org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>) is not applicable (argument mismatch; int cannot be converted to scala.collection.Seq<java.lang.Object>) method org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>) is not applicable (cannot infer type-variable(s) K (actual and formal argument lists differ in length)) So it assumes key/value pairs for the coprocessor Why is such difference between APIs? Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <boris.lublin...@lightbend.com> > wrote: > > I am trying to covert Scala code (which works fine) to Java > The sacral code is: > // create a Kafka consumers > // Data > val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]]( > DATA_TOPIC, > new ByteArraySchema, > dataKafkaProps > ) > > // Model > val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]]( > MODELS_TOPIC, > new ByteArraySchema, > modelKafkaProps > ) > > // Create input data streams > val modelsStream = env.addSource(modelConsumer) > val dataStream = env.addSource(dataConsumer) > > // Read data from streams > val models = modelsStream.map(ModelToServe.fromByteArray(_)) > .flatMap(BadDataHandler[ModelToServe]) > .keyBy(_.dataType) > val data = dataStream.map(DataRecord.fromByteArray(_)) > .flatMap(BadDataHandler[WineRecord]) > .keyBy(_.dataType) > Now I am trying to re write it to Java and fighting with the requirement of > providing types, where they should be obvious > > // create a Kafka consumers > // Data > FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>( > ModelServingConfiguration.DATA_TOPIC, > new ByteArraySchema(), > dataKafkaProps); > > // Model > FlinkKafkaConsumer010<byte[]> modelConsumer = new FlinkKafkaConsumer010<>( > ModelServingConfiguration.MODELS_TOPIC, > new ByteArraySchema(), > modelKafkaProps); > > // Create input data streams > DataStream<byte[]> modelsStream = env.addSource(modelConsumer, > PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); > DataStream<byte[]> dataStream = env.addSource(dataConsumer, > PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); > // Read data from streams > DataStream<Tuple2<String,ModelToServe>> models = modelsStream > .flatMap(new ModelConverter(), new > TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, > TypeInformation.of(ModelToServe.class))); > > Am I missing something similar to import org.apache.flink.api.scala._ > In java? > > Now if this is an only way, Does this seems right? > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> > https://www.lightbend.com/ >