Hi Ken, This is actually a bug that a Partition should not require a UID. It is fixed in 1.9.2 and 1.10. see FLINK-14910 <https://jira.apache.org/jira/browse/FLINK-14910>.
Thanks, Zhu Zhu Ken Krugler <kkrugler_li...@transpac.com> 于2020年1月10日周五 上午7:51写道: > Hi all, > > [Of course, right after hitting send I realized I could just do > rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might > be something to add to the docs, or provide a .uid() method on KeyedStreams > for syntactic sugar] > > Just for grins, I disabled auto-generated UIDs for the taxi rides/fares > state example in the online tutorial. > > env.getConfig().disableAutoGeneratedUIDs(); > > I then added UIDs for all operators, sources & sinks. But I still get the > following when calling env.getExecutionPlan() or env.execute(): > > java.lang.IllegalStateException: Auto generated UIDs have been disabled > but no UID or hash has been assigned to operator Partition > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252) > at > org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564) > at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63) > > The simple workflow is: > > DataStream<TaxiRide> rides = env > .addSource(new CheckpointedTaxiRideSource(ridesFile, > servingSpeedFactor)) > .uid("source: taxi rides") > .name("taxi rides") > .filter((TaxiRide ride) -> ride.isStart) > .uid("filter: only start rides") > .name("only start rides") > .keyBy((TaxiRide ride) -> ride.rideId); > > DataStream<TaxiFare> fares = env > .addSource(new CheckpointedTaxiFareSource(faresFile, > servingSpeedFactor)) > .uid("source: taxi fares") > .name("taxi fares") > .keyBy((TaxiFare fare) -> fare.rideId); > > DataStreamSink<Tuple2<TaxiRide, TaxiFare>> enriched = rides > .connect(fares) > .flatMap(new EnrichmentFunction()) > .uid("function: enrich rides with fares") > .name("enrich rides with fares") > .addSink(sink) > .uid("sink: enriched taxi rides") > .name("enriched taxi rides"); > > Internally the exception is thrown when the EnrichFunction (a > RichCoFlatMapFunction) is being transformed by > StreamGraphGenerator.transformTwoInputTransform(). > > This calls StreamGraphGenerator.transform() with the two inputs, but the > Transformation for each input is a PartitionTransformation. > > I don’t see a way to set the UID following the keyBy(), as a KeyedStream > creates the PartitionTransformation without a UID. > > Any insight into setting the UID properly here? Or should > StreamGraphGenerator.transform() skip the no-uid check for > PartitionTransformation, since that’s not an operator with state? > > Thanks, > > — Ken > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com > custom big data solutions & training > Hadoop, Cascading, Cassandra & Solr > >