I want implement left outer join from two dataset i use Tuple data type
package org.apache.flink.examples.java.relational; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import java.io.File; @SuppressWarnings("serial") public class TPCHQuery3 { //filed name in cutomer table public static class LeftOuterJoin implements CoGroupFunction<Tuple2<Tuple1<Integer>, String>, Tuple2<Tuple1<Integer>, String>, Tuple2<Tuple1<Integer>,Tuple1<Integer>>> { @Override public void coGroup(Iterable<Tuple2<Tuple1<Integer>, String>> leftElements, Iterable<Tuple2<Tuple1<Integer>, String>> rightElements, Collector<Tuple2<Tuple1<Integer>,Tuple1<Integer>>> out) throws Exception { for (Tuple2<Tuple1<Integer>, String> leftElem : leftElements) { boolean hadElements = false; for (Tuple2<Tuple1<Integer>, String> rightElem : rightElements) { out.collect(new Tuple2<Tuple1<Integer>,Tuple1<Integer>>(leftElem.f0, rightElem.f0)); hadElements = true; } if (!hadElements) { out.collect(new Tuple2<Tuple1<Integer>, Tuple1<Integer>>(leftElem.f0, null)); } } } } public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple1<Integer>> leftSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/customer.csv") .fieldDelimiter('|') .includeFields("10000000").ignoreFirstLine() .types(Integer.class); // DataSource<Integer> leftSide = env.fromElements(1, 2, 3, 4, 5); DataSet<Tuple2<Tuple1<Integer>, String>> leftSide2 = leftSide.map( new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>, String>>() { @Override public Tuple2<Tuple1<Integer>, String> map(Tuple1<Integer> x) throws Exception { return new Tuple2<Tuple1<Integer>, String>(x, "some data"); } }); DataSet<Tuple1<Integer>> rightSide=env.readCsvFile("/home/hadoop/Desktop/Dataset/orders.csv") .fieldDelimiter('|') .includeFields("010000000").ignoreFirstLine() .types(Integer.class); // DataSource<Integer> rightSide = env.fromElements(1,2,4, 5, 6, 7, 8, 9, 10); DataSet<Tuple2<Tuple1<Integer>, String>> rightSide2 = rightSide.map( new MapFunction<Tuple1<Integer>, Tuple2<Tuple1<Integer>, String>>() { @Override public Tuple2<Tuple1<Integer>, String> map(Tuple1<Integer> x) throws Exception { return new Tuple2<Tuple1<Integer>, String>(x, "some other data"); } }); DataSet<Tuple2<Tuple1<Integer>, Tuple1<Integer>>> leftOuterJoin = leftSide2.coGroup(rightSide2) .where(0) .equalTo(0) .with(new LeftOuterJoin()); leftOuterJoin.writeAsCsv("/home/hadoop/Desktop/Dataset/output1.csv", "\n", "|");; env.execute(); } Error code After run programs Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: org.apache.flink.types.NullFieldException: Field 1 is null, but expected to hold a value. at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:97) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51) at org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) at org.apache.flink.runtime.io.network.api.RecordWriter.emit(RecordWriter.java:82) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:88) at org.apache.flink.examples.java.relational.TPCHQuery3$LeftOuterJoin.coGroup(TPCHQuery3.java:38) at org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:130) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:235) at java.lang.Thread.run(Thread.java:724) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:540) at org.apache.flink.examples.java.relational.TPCHQuery3.main(TPCHQuery3.java:80) -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/error-when-eun-program-left-outer-join-tp1141.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.