Hello, I am using custom made connector to create Source table in this way:
create table Source ( ts TIMESTAMP(3), instance STRING, sservice STRING, logdatetime STRING, threadid STRING, level STRING, log_line STRING ) with ( 'connector'='lokiquery', 'host'='<lokiurl>', 'lokiqueryparamsstring'='query={instance="test", service="test"}&limit=5000&start=2022-12-15T16:40:09.560Z&end=2022-12-15T16:58:09.570Z' ); In this table I successfully store data from the specified time range from loki. Data is coming as a batch. (not stream) Then I want to create another table that will look for patterns in the log_line column from the Source table. I am doing following: SELECT * FROM Source MATCH_RECOGNIZE ( ORDER BY ts MEASURES START_ROW.ts AS start_ts, END_ROW.ts AS end_ts ONE ROW PER MATCH AFTER MATCH SKIP TO LAST END_ROW PATTERN (START_ROW{1} UNK_ROW+? MID_ROW{2} END_ROW{1}) DEFINE START_ROW AS START_ROW.log_line SIMILAR TO '%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%', MID_ROW AS MID_ROW.log_line SIMILAR TO '%DSResponse - DSResponse: List with%', END_ROW AS END_ROW.log_line SIMILAR TO '%ContentRepository%' ) MR; And when using python's pyflink, this works just fine! But when I try the same thing in flink sql cli, I get strange error after executing second table: [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=LOGICAL, FlinkRelDistributionTraitDef=any, sort=[]. Missing conversion is LogicalMatch[convention: NONE -> LOGICAL] There is 1 empty subset: rel#175:RelSubset#1.LOGICAL.any.[], the relevant part of the original plan is as follows 167:LogicalMatch(partition=[[]], order=[[0 ASC-nulls-first]], outputFields=[[start_ts, end_ts]], allRows=[false], after=[SKIP TO LAST(_UTF-16LE'END_ROW')], pattern=[(((PATTERN_QUANTIFIER(_UTF-16LE'START_ROW', 1, 1, false), PATTERN_QUANTIFIER(_UTF-16LE'UNK_ROW', 1, -1, true)), PATTERN_QUANTIFIER(_UTF-16LE'MID_ROW', 2, 2, false)), PATTERN_QUANTIFIER(_UTF-16LE'END_ROW', 1, 1, false))], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[SIMILAR TO(PREV(START_ROW.$6, 0), _UTF-16LE'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%'), SIMILAR TO(PREV(MID_ROW.$6, 0), _UTF-16LE'%DSResponse - DSResponse: List with%'), SIMILAR TO(PREV(END_ROW.$6, 0), _UTF-16LE'%ContentRepository%')]], inputFields=[[ts, instance, service, logdatetime, threadid, level, log_line]]) 1:LogicalTableScan(subset=[rel#166:RelSubset#0.NONE.any.[]], table=[[default_catalog, default_database, Source]]) In python, where this works, these are only configs that I use for table environment (of course I also include jar for my custom connector) : env_settings = EnvironmentSettings.in_batch_mode() t_env = TableEnvironment.create(env_settings) t_env.get_config().get_configuration().set_string("parallelism.default", "1") Therefore I set these values in flink sql table: SET 'execution.runtime-mode' = 'batch'; SET 'parallelism.default' = '1'; But it didn't help. Does anyone have any idea what could be causing this issue? Thank you, Marjan