[ https://issues.apache.org/jira/browse/FLINK-20369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327509#comment-17327509 ]
Flink Jira Bot commented on FLINK-20369: ---------------------------------------- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Improve the digest of TableSourceScan and Sink node > --------------------------------------------------- > > Key: FLINK-20369 > URL: https://issues.apache.org/jira/browse/FLINK-20369 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: Jark Wu > Priority: Major > Labels: stale-major > > Currently, > 1. the digest of {{TableSourceScan}} and {{Sink}} doesn't contain the > connector information which will be quite useful when debugging. > 2. The table name is quite verbose when under default catalog and database, > would be better to simplify it to only table name if under default catalog > and database. > 3. Maybe it's nicer to have changelog mode of source and sink, because it's a > meta information of {{DynamicTableSource/Sink#getChangelogMode}}. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) > +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > : +- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > : +- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) > +- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)