Version 1.11.2 On Sun, Dec 6, 2020 at 10:20 PM Yun Gao <yungao...@aliyun.com> wrote:
> Hi Rex, > > I tried a similar example[1] but did not reproduce the issue, which > version of Flink you are using now ? > > Best, > Yun > > > > > [1] The example code: > > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > bsEnv.setRestartStrategy(RestartStrategies.noRestart()); > bsEnv.setParallelism(1); > > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, > bsSettings); > > DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new > RichParallelSourceFunction<Tuple2<Integer, String>>() { > > @Override > public void run(SourceContext<Tuple2<Integer, String>> sourceContext) > throws Exception { > sourceContext.collect(new Tuple2<>(0, "test")); > } > > @Override > public void cancel() { > > } > }); > > Table table = bsTableEnv.fromDataStream( > source, $("id"), $("name")); > Table table2 = table.select(call("abs", $("id")), $("name")) > .as("new_id", "new_name"); > > bsTableEnv.createTemporaryView("view", table2); > Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as > new_name from view group by new_id"); > > Table ret = table.join(handled) > .where($("id").isEqual($("new_id"))) > .select($("id"), $("name"), $("new_name")); > System.out.println(ret.explain()); > > DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, > Row.class); > row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() { > @Override > public void invoke(Tuple2<Boolean, Row> value, Context context) throws > Exception { > > } > }); > > System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON()); > > ------------------------------------------------------------------ > Sender:Rex Fenley<r...@remind101.com> > Date:2020/12/04 14:18:21 > Recipient:Yun Gao<yungao...@aliyun.com> > Cc:user<user@flink.apache.org>; Brad Davis<brad.da...@remind101.com> > Theme:Re: Duplicate operators generated by plan > > cc Brad > > On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <r...@remind101.com> wrote: > >> Yes, the same exact input operators go into both joins. >> >> The chunk of code for the joins from the specific part of the plan I >> showed is as follows. The orgUsersTable is later filtered into one table >> and aggregated and another table and aggregated. The planner seems to >> duplicate orgUsersTable into 2 operators even though I create only 1 of it. >> >> // in the main function >> val orgUsersTable = splatRoles( >> this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS), >> OrgUsersRoleSplatPrefix, >> this.tableEnv >> ) >> >> // helper function >> def splatRoles( >> table: Table, >> columnPrefix: String, >> tableEnv: TableEnvironment >> ): Table = { >> // Flink does not have a contains function so we have to splat out our >> role array's contents >> // and join it to the originating table. >> val func = new SplatRolesFunc() >> val splatted = table >> .map(func($"roles", $"id")) >> .as( >> "id_splatted", >> s"${columnPrefix}_is_admin", >> s"${columnPrefix}_is_teacher", >> s"${columnPrefix}_is_student", >> s"${columnPrefix}_is_parent" >> ) >> // FIRST_VALUE is only available in SQL - so this is SQL. >> // Rationale: We have to group by after a map to preserve the pk >> inference, otherwise flink will >> // toss it out and all future joins will not have a unique key. >> tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted) >> val grouped = tableEnv.sqlQuery(s""" >> SELECT >> id_splatted, >> FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin, >> FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher, >> FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student, >> FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent >> FROM ${columnPrefix}_splatted >> GROUP BY id_splatted >> """) >> return table >> .join(grouped, $"id" === $"id_splatted") >> .dropColumns($"id_splatted") >> .renameColumns($"roles".as(s"${columnPrefix}_roles")) >> } >> >> @FunctionHint( >> output = new DataTypeHint( >> "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student >> BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)" >> ) >> ) >> class SplatRolesFunc extends ScalarFunction { >> def eval(roles: Array[String], id: java.lang.Long): Row = { >> val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue) >> val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue) >> val isStudent: java.lang.Boolean = roles.contains(Student.rawValue) >> val isParent: java.lang.Boolean = roles.contains(Parent.rawValue) >> return Row.of(id, isAdmin, isTeacher, isStudent, isParent) >> } >> >> override def getResultType(signature: Array[Class[_]]): >> TypeInformation[_] = >> Types.ROW( >> Types.LONG, >> Types.BOOLEAN, >> Types.BOOLEAN, >> Types.BOOLEAN, >> Types.BOOLEAN >> ) >> } >> >> >> On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote: >> >>> Hi Rex, >>> >>> Could you also attach one example for these sql / table ? And one >>> possible issue to confirm is that does the operators with the same names >>> also have the same inputs ? >>> >>> Best, >>> Yun >>> >>> ------------------Original Mail ------------------ >>> *Sender:*Rex Fenley <r...@remind101.com> >>> *Send Date:*Fri Dec 4 02:55:41 2020 >>> *Recipients:*user <user@flink.apache.org> >>> *Subject:*Duplicate operators generated by plan >>> >>>> Hello, >>>> >>>> I'm running into an issue where my execution plan is creating the same >>>> exact join operator multiple times simply because the subsequent operator >>>> filters on a different boolean value. This is a massive duplication of >>>> storage and work. The filtered operators which follow result in only a >>>> small set of elements filtered out per set too. >>>> >>>> eg. of two separate operators that are equal >>>> >>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, >>>> organization_id, user_id, roles, id_splatted, org_user_is_admin, >>>> org_user_is_teacher, org_user_is_student, org_user_is_parent], >>>> leftInputSpec=[JoinKeyContainsUniqueKey], >>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, >>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin, >>>> org_user_is_teacher, org_user_is_student, org_user_is_parent] >>>> >>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, >>>> organization_id, user_id, roles, id_splatted, org_user_is_admin, >>>> org_user_is_teacher, org_user_is_student, org_user_is_parent], >>>> leftInputSpec=[JoinKeyContainsUniqueKey], >>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, >>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin, >>>> org_user_is_teacher, org_user_is_student, org_user_is_parent]) >>>> >>>> Which are entirely the same datasets being processed. >>>> >>>> The first one points to >>>> GroupAggregate(groupBy=[user_id], select=[user_id, >>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, >>>> TMP_0.f0 AS admin_organization_ids]) >>>> >>>> The second one points to >>>> GroupAggregate(groupBy=[user_id], select=[user_id, >>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, >>>> TMP_0.f0 AS teacher_organization_ids]) >>>> >>>> And these are both intersecting sets of data though slightly different. >>>> I don't see why that would make the 1 join from before split into 2 though. >>>> There's even a case where I'm seeing a join tripled. >>>> >>>> Is there a good reason why this should happen? Is there a way to tell >>>> flink to not duplicate operators where it doesn't need to? >>>> >>>> Thanks! >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>