I am trying to covert Scala code (which works fine) to Java The sacral code is: // create a Kafka consumers // Data val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]]( DATA_TOPIC, new ByteArraySchema, dataKafkaProps )
// Model val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]]( MODELS_TOPIC, new ByteArraySchema, modelKafkaProps ) // Create input data streams val modelsStream = env.addSource(modelConsumer) val dataStream = env.addSource(dataConsumer) // Read data from streams val models = modelsStream.map(ModelToServe.fromByteArray(_)) .flatMap(BadDataHandler[ModelToServe]) .keyBy(_.dataType) val data = dataStream.map(DataRecord.fromByteArray(_)) .flatMap(BadDataHandler[WineRecord]) .keyBy(_.dataType) Now I am trying to re write it to Java and fighting with the requirement of providing types, where they should be obvious // create a Kafka consumers // Data FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>( ModelServingConfiguration.DATA_TOPIC, new ByteArraySchema(), dataKafkaProps); // Model FlinkKafkaConsumer010<byte[]> modelConsumer = new FlinkKafkaConsumer010<>( ModelServingConfiguration.MODELS_TOPIC, new ByteArraySchema(), modelKafkaProps); // Create input data streams DataStream<byte[]> modelsStream = env.addSource(modelConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); DataStream<byte[]> dataStream = env.addSource(dataConsumer, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); // Read data from streams DataStream<Tuple2<String,ModelToServe>> models = modelsStream .flatMap(new ModelConverter(), new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(ModelToServe.class))); Am I missing something similar to import org.apache.flink.api.scala._ In java? Now if this is an only way, Does this seems right? Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/