[ https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17734087#comment-17734087 ]
luoyuxia edited comment on FLINK-32374 at 6/20/23 1:49 AM: ----------------------------------------------------------- master: f69ed3454f2ab200310edee230da292ee2408503 1.17: 3b5c1f7915ecfe0f7e353fd342f8d22df5bcd7c4 1.16: 08bced4646c4bef9aca7089d0764426d78a89b0a was (Author: luoyuxia): master: f69ed3454f2ab200310edee230da292ee2408503 1.17: todo 1.16: todo > ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for > overwriting > -------------------------------------------------------------------------------------- > > Key: FLINK-32374 > URL: https://issues.apache.org/jira/browse/FLINK-32374 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1 > Reporter: Jane Chan > Assignee: Jane Chan > Priority: Major > Labels: pull-request-available > Fix For: 1.18.0 > > > If the existing JSON plan is not truncated when overwriting, and the newly > generated JSON plan contents are shorter than the previous JSON plan content, > the plan be an invalid JSON. > h4. How to reproduce > {code:sql} > Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = > 'blackhole'); > [INFO] Execute statement succeed. > Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) > with ('connector' = 'datagen'); > [INFO] Execute statement succeed. > Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink > select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source; > [INFO] Execute statement succeed. > Flink SQL> set 'table.plan.force-recompile' = 'true'; > [INFO] Execute statement succeed. > Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink > select * from (values (2, 'bye')) T (id, message); > [INFO] Execute statement succeed. > {code} > cat -n debug.json, and check L#67 > {code:json} > 1 { > 2 "flinkVersion" : "1.17", > 3 "nodes" : [ { > 4 "id" : 15, > 5 "type" : "stream-exec-values_1", > 6 "tuples" : [ [ { > 7 "kind" : "LITERAL", > 8 "value" : "2", > 9 "type" : "INT NOT NULL" > 10 }, { > 11 "kind" : "LITERAL", > 12 "value" : "bye", > 13 "type" : "CHAR(3) NOT NULL" > 14 } ] ], > 15 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) > NOT NULL>", > 16 "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])", > 17 "inputProperties" : [ ] > 18 }, { > 19 "id" : 16, > 20 "type" : "stream-exec-sink_1", > 21 "configuration" : { > 22 "table.exec.sink.keyed-shuffle" : "AUTO", > 23 "table.exec.sink.not-null-enforcer" : "ERROR", > 24 "table.exec.sink.type-length-enforcer" : "IGNORE", > 25 "table.exec.sink.upsert-materialize" : "AUTO" > 26 }, > 27 "dynamicTableSink" : { > 28 "table" : { > 29 "identifier" : > "`default_catalog`.`default_database`.`debug_sink`", > 30 "resolvedTable" : { > 31 "schema" : { > 32 "columns" : [ { > 33 "name" : "f0", > 34 "dataType" : "INT" > 35 }, { > 36 "name" : "f1", > 37 "dataType" : "VARCHAR(2147483647)" > 38 } ], > 39 "watermarkSpecs" : [ ] > 40 }, > 41 "partitionKeys" : [ ], > 42 "options" : { > 43 "connector" : "blackhole" > 44 } > 45 } > 46 } > 47 }, > 48 "inputChangelogMode" : [ "INSERT" ], > 49 "inputProperties" : [ { > 50 "requiredDistribution" : { > 51 "type" : "UNKNOWN" > 52 }, > 53 "damBehavior" : "PIPELINED", > 54 "priority" : 0 > 55 } ], > 56 "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) > NOT NULL>", > 57 "description" : > "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, > message])" > 58 } ], > 59 "edges" : [ { > 60 "source" : 15, > 61 "target" : 16, > 62 "shuffle" : { > 63 "type" : "FORWARD" > 64 }, > 65 "shuffleMode" : "PIPELINED" > 66 } ] > 67 } "$CONCAT$1", > 68 "operands" : [ { > 69 "kind" : "INPUT_REF", > 70 "inputIndex" : 2, > 71 "type" : "VARCHAR(2147483647)" > 72 }, { > 73 "kind" : "INPUT_REF", > 74 "inputIndex" : 3, > 75 "type" : "VARCHAR(2147483647)" > 76 } ], > 77 "type" : "VARCHAR(2147483647)" > 78 } ], > 79 "condition" : null, > 80 "inputProperties" : [ { > 81 "requiredDistribution" : { > 82 "type" : "UNKNOWN" > 83 }, > 84 "damBehavior" : "PIPELINED", > 85 "priority" : 0 > 86 } ], > 87 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", > 88 "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, > CONCAT(f2, f3) AS f1])" > 89 }, { > 90 "id" : 14, > 91 "type" : "stream-exec-sink_1", > 92 "configuration" : { > 93 "table.exec.sink.keyed-shuffle" : "AUTO", > 94 "table.exec.sink.not-null-enforcer" : "ERROR", > 95 "table.exec.sink.type-length-enforcer" : "IGNORE", > 96 "table.exec.sink.upsert-materialize" : "AUTO" > 97 }, > 98 "dynamicTableSink" : { > 99 "table" : { > 100 "identifier" : > "`default_catalog`.`default_database`.`debug_sink`", > 101 "resolvedTable" : { > 102 "schema" : { > 103 "columns" : [ { > 104 "name" : "f0", > 105 "dataType" : "INT" > 106 }, { > 107 "name" : "f1", > 108 "dataType" : "VARCHAR(2147483647)" > 109 } ], > 110 "watermarkSpecs" : [ ] > 111 }, > 112 "partitionKeys" : [ ], > 113 "options" : { > 114 "connector" : "blackhole" > 115 } > 116 } > 117 } > 118 }, > 119 "inputChangelogMode" : [ "INSERT" ], > 120 "inputProperties" : [ { > 121 "requiredDistribution" : { > 122 "type" : "UNKNOWN" > 123 }, > 124 "damBehavior" : "PIPELINED", > 125 "priority" : 0 > 126 } ], > 127 "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>", > 128 "description" : > "Sink(table=[default_catalog.default_database.debug_sink], fields=[f0, f1])" > 129 } ], > 130 "edges" : [ { > 131 "source" : 12, > 132 "target" : 13, > 133 "shuffle" : { > 134 "type" : "FORWARD" > 135 }, > 136 "shuffleMode" : "PIPELINED" > 137 }, { > 138 "source" : 13, > 139 "target" : 14, > 140 "shuffle" : { > 141 "type" : "FORWARD" > 142 }, > 143 "shuffleMode" : "PIPELINED" > 144 } ] > 145 } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)