Hi Team,
I am working on Iceberg in process compaction, and trying to use SQL window
join to compare 2 streams like this:
*Table fsFiles = tEnv.sqlQuery( "SELECT runId, location, window_start,
window_end " + "FROM TABLE(" + "*
*TUMBLE(" + " TABLE " + fileSystemFilesTable + "," + "
DESCRIPTOR(ts), " +*
* "* *INTERVAL '1' SECONDS))");*
*Table tableFiles = tEnv.sqlQuery(*
* "SELECT runId, location, window_start, window_end " +*
* "FROM TABLE(" +*
* "* *TUMBLE(" +*
* " TABLE " + **tableFilesTable** + "," +*
* " DESCRIPTOR(ts), " +** "* *INTERVAL '1' SECONDS))");*
Then I print out these streams with the following code, I see the values in
the logs:
*tEnv.toDataStream(fsFiles).print("FS");*
*tEnv.toDataStream(tableFiles).print("TS");*
The result is:
*FS:2> +I[1705405510802,
file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_with_pk/metadata/00000-b717c629-bb71-48df-a30b-615aeb320aec.metadata.json,
2024-01-16T11:45:10, 2024-01-16T11:45:11]*
*[..]*
*TS:2> +I[1705405510802,
file:/var/folders/19/xs17kb0j7dj0klq324_vj7sc0000gn/T/junit13711198986865553391/db.db/test_table_unpartitioned/metadata/snap-532818363465442978-1-dc47e70d-82eb-490a-a21d-c032b88c3303.avro,
2024-01-16T11:45:10, 2024-01-16T11:45:11]*
*[..]*
So this is as I expected the 2 streams periodically emit the incoming data
files with the runIds, timestamps.
Now, I try to run an ANTI JOIN on these streams:
*Table missingFiles = tEnv.sqlQuery(*
* " SELECT ts, location\n" +*
* " FROM (\n" +*
* " SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
* " ) L WHERE L.location NOT IN (\n" +*
* " SELECT location FROM ( \n" +*
* " SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
* " ) R WHERE L.window_start = R.window_start AND
L.window_end = R.window_end)");*
And event though there is some missing files based on the logs, I do not
see any records in the logs for the missing table:
*tEnv.toDataStream(missingFiles).print("MISSING");*
Just for trying out a different way of solving/checking this, I tried to
have a FULL JOIN to see how the join works:
*Table joined = tEnv.sqlQuery(*
* " SELECT fs_files.location AS fs_location, table_files.location
AS table_location,\n" +*
* " COALESCE(fs_files.window_start, table_files.window_start)
as window_start,\n" +*
* " COALESCE(fs_files.window_end, table_files.window_end) as
window_end\n" +*
* " FROM (SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))) fs_files\n" +*
* " LEFT JOIN (SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles
+ ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))) table_files\n" +*
* " ON fs_files.location = table_files.location AND\n" +*
* " fs_files.window_start = table_files.window_start AND \n"
+*
* " fs_files.window_end = table_files.window_end\n");*
And there is nothing in the logs for the join either.
I think I might miss something around the windowing, and my joined windows
are not triggered with the complex queries, but I am stuck at the moment,
so any help would be appreciated.
Thanks,
Peter