More questions
In Scala my DataProcessor is defined as
class DataProcessorKeyed extends CoProcessFunction[WineRecord, ModelToServe, 
Double] with CheckpointedFunction {
And it is used as follows
val models = modelsStream.map(ModelToServe.fromByteArray(_))
  .flatMap(BadDataHandler[ModelToServe])
  .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
  .flatMap(BadDataHandler[WineRecord])
  .keyBy(_.dataType)

// Merge streams
data
  .connect(models)
  .process(DataProcessorKeyed())
When I am doing the same thing in Java
public class DataProcessorKeyed extends 
CoProcessFunction<Winerecord.WineRecord, ModelToServe, Double> implements 
CheckpointedFunction{
Which I am using as follows
// Read data from streams
DataStream<Tuple2<String, ModelToServe>> models = modelsStream
        .flatMap(new ModelDataConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(ModelToServe.class)))
        .keyBy(0);
DataStream<Tuple2<String, Winerecord.WineRecord>> data = dataStream
        .flatMap(new DataDataConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(Winerecord.WineRecord.class)))
        .keyBy(0);

// Merge streams
data
        .connect(models)
        .process(new DataProcessorKeyed());
I am getting an error

Error:(68, 17) java: no suitable method found for keyBy(int)
    method 
org.apache.flink.streaming.api.scala.DataStream.keyBy(scala.collection.Seq<java.lang.Object>)
 is not applicable
      (argument mismatch; int cannot be converted to 
scala.collection.Seq<java.lang.Object>)
    method 
org.apache.flink.streaming.api.scala.DataStream.<K>keyBy(scala.Function1<org.apache.flink.api.java.tuple.Tuple2<java.lang.String,com.lightbend.model.ModelToServe>,K>,org.apache.flink.api.common.typeinfo.TypeInformation<K>)
 is not applicable
      (cannot infer type-variable(s) K
        (actual and formal argument lists differ in length))
So it assumes key/value pairs for the coprocessor

Why is such difference between APIs?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Jan 10, 2018, at 6:20 PM, Boris Lublinsky <boris.lublin...@lightbend.com> 
> wrote:
> 
> I am trying to covert Scala code (which works fine) to Java
> The sacral code is:
> // create a Kafka consumers
> // Data
> val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>   DATA_TOPIC,
>   new ByteArraySchema,
>   dataKafkaProps
> )
> 
> // Model
> val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
>   MODELS_TOPIC,
>   new ByteArraySchema,
>   modelKafkaProps
> )
> 
> // Create input data streams
> val modelsStream = env.addSource(modelConsumer)
> val dataStream = env.addSource(dataConsumer)
> 
> // Read data from streams
> val models = modelsStream.map(ModelToServe.fromByteArray(_))
>   .flatMap(BadDataHandler[ModelToServe])
>   .keyBy(_.dataType)
> val data = dataStream.map(DataRecord.fromByteArray(_))
>   .flatMap(BadDataHandler[WineRecord])
>   .keyBy(_.dataType)
> Now I am trying to re write it to Java and fighting with the requirement of 
> providing types, where they should be obvious
> 
> // create a Kafka consumers
> // Data
> FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>(
>         ModelServingConfiguration.DATA_TOPIC,
>         new ByteArraySchema(),
>         dataKafkaProps);
> 
> // Model
> FlinkKafkaConsumer010<byte[]>  modelConsumer = new FlinkKafkaConsumer010<>(
>         ModelServingConfiguration.MODELS_TOPIC,
>         new ByteArraySchema(),
>         modelKafkaProps);
> 
> // Create input data streams
> DataStream<byte[]> modelsStream = env.addSource(modelConsumer, 
> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
> DataStream<byte[]> dataStream = env.addSource(dataConsumer, 
> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
> // Read data from streams
> DataStream<Tuple2<String,ModelToServe>> models = modelsStream
>      .flatMap(new ModelConverter(), new 
> TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
> TypeInformation.of(ModelToServe.class)));
> 
> Am I missing something similar to import org.apache.flink.api.scala._
>  In java?
> 
> Now if this is an only way, Does this seems right?
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
> https://www.lightbend.com/
> 

Reply via email to