Hi Rex,

   I tried a similar example[1] but did not reproduce the issue, which version 
of Flink you are using now ?

Best,
 Yun


[1] The example code:
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setRestartStrategy(RestartStrategies.noRestart());
bsEnv.setParallelism(1);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);

DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new 
RichParallelSourceFunction<Tuple2<Integer, String>>() {

   @Override
   public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws 
Exception {
      sourceContext.collect(new Tuple2<>(0, "test"));
   }

   @Override
   public void cancel() {

   }
});

Table table = bsTableEnv.fromDataStream(
      source, $("id"), $("name"));
Table table2 = table.select(call("abs", $("id")), $("name"))
      .as("new_id", "new_name");

bsTableEnv.createTemporaryView("view", table2);
Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as 
new_name from view group by new_id");

Table ret = table.join(handled)
      .where($("id").isEqual($("new_id")))
      .select($("id"), $("name"), $("new_name"));
System.out.println(ret.explain());

DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, 
Row.class);
row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
   @Override
   public void invoke(Tuple2<Boolean, Row> value, Context context) throws 
Exception {

   }
});

System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
------------------------------------------------------------------
Sender:Rex Fenley<r...@remind101.com>
Date:2020/12/04 14:18:21
Recipient:Yun Gao<yungao...@aliyun.com>
Cc:user<user@flink.apache.org>; Brad Davis<brad.da...@remind101.com>
Theme:Re: Duplicate operators generated by plan

cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <r...@remind101.com> wrote:

Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is 
as follows. The orgUsersTable is later filtered into one table and aggregated 
and another table and aggregated. The planner seems to duplicate orgUsersTable 
into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
 this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
 OrgUsersRoleSplatPrefix,
 this.tableEnv
 )

// helper function
 def splatRoles(
 table: Table,
 columnPrefix: String,
 tableEnv: TableEnvironment
 ): Table = {
 // Flink does not have a contains function so we have to splat out our role 
array's contents
 // and join it to the originating table.
 val func = new SplatRolesFunc()
 val splatted = table
 .map(func($"roles", $"id"))
 .as(
 "id_splatted",
 s"${columnPrefix}_is_admin",
 s"${columnPrefix}_is_teacher",
 s"${columnPrefix}_is_student",
 s"${columnPrefix}_is_parent"
 )
 // FIRST_VALUE is only available in SQL - so this is SQL.
 // Rationale: We have to group by after a map to preserve the pk inference, 
otherwise flink will
 // toss it out and all future joins will not have a unique key.
 tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
 val grouped = tableEnv.sqlQuery(s"""
 SELECT
 id_splatted,
 FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
 FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
 FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
 FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
 FROM ${columnPrefix}_splatted
 GROUP BY id_splatted
 """)
 return table
 .join(grouped, $"id" === $"id_splatted")
 .dropColumns($"id_splatted")
 .renameColumns($"roles".as(s"${columnPrefix}_roles"))
 }

@FunctionHint(
 output = new DataTypeHint(
 "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student 
BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
 )
)
class SplatRolesFunc extends ScalarFunction {
 def eval(roles: Array[String], id: java.lang.Long): Row = {
 val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
 val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
 val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
 val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
 return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
 }
 override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
 Types.ROW(
 Types.LONG,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN
 )
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yungao...@aliyun.com> wrote:

Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible 
issue to confirm is that does the operators with the same names also have the 
same inputs ?

Best,
Yun

 ------------------Original Mail ------------------
Sender:Rex Fenley <r...@remind101.com>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <user@flink.apache.org>
Subject:Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same exact 
join operator multiple times simply because the subsequent operator filters on 
a different boolean value. This is a massive duplication of storage and work. 
The filtered operators which follow result in only a small set of elements 
filtered out per set too.

eg. of two separate operators that are equal

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, 
organization_id, user_id, roles, id_splatted, org_user_is_admin, 
org_user_is_teacher, org_user_is_student, org_user_is_parent], 
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, 
user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, 
org_user_is_student, org_user_is_parent]) 

Which are entirely the same datasets being processed.

The first one points to 
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
admin_organization_ids]) 

The second one points to
 GroupAggregate(groupBy=[user_id], select=[user_id, 
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS 
teacher_organization_ids]) 

And these are both intersecting sets of data though slightly different. I don't 
see why that would make the 1 join from before split into 2 though. There's 
even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to 
not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US 

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US 

Reply via email to