Hi all,
Just for grins, I disabled auto-generated UIDs for the taxi rides/fares state
example in the online tutorial.
env.getConfig().disableAutoGeneratedUIDs();
I then added UIDs for all operators, sources & sinks. But I still get the
following when calling env.getExecutionPlan() or env.execute():
java.lang.IllegalStateException: Auto generated UIDs have been disabled but no
UID or hash has been assigned to operator Partition
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)
The simple workflow is:
DataStream<TaxiRide> rides = env
.addSource(new CheckpointedTaxiRideSource(ridesFile,
servingSpeedFactor))
.uid("source: taxi rides")
.name("taxi rides")
.filter((TaxiRide ride) -> ride.isStart)
.uid("filter: only start rides")
.name("only start rides")
.keyBy((TaxiRide ride) -> ride.rideId);
DataStream<TaxiFare> fares = env
.addSource(new CheckpointedTaxiFareSource(faresFile,
servingSpeedFactor))
.uid("source: taxi fares")
.name("taxi fares")
.keyBy((TaxiFare fare) -> fare.rideId);
DataStreamSink<Tuple2<TaxiRide, TaxiFare>> enriched = rides
.connect(fares)
.flatMap(new EnrichmentFunction())
.uid("function: enrich rides with fares")
.name("enrich rides with fares")
.addSink(sink)
.uid("sink: enriched taxi rides")
.name("enriched taxi rides");
Internally the exception is thrown when the EnrichFunction (a
RichCoFlatMapFunction) is being transformed by
StreamGraphGenerator.transformTwoInputTransform().
This calls StreamGraphGenerator.transform() with the two inputs, but the
Transformation for each input is a PartitionTransformation.
I don’t see a way to set the UID following the keyBy(), as a KeyedStream
creates the PartitionTransformation without a UID.
Any insight into setting the UID properly here? Or should
StreamGraphGenerator.transform() skip the no-uid check for
PartitionTransformation, since that’s not an operator with state?
Thanks,
— Ken
--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr