Hi Marjan,

That's rather weird, because PyFlink uses the same implementation. Could
you file a Jira ticket? If not, let me know and I'll create one for you.

Best regards,

Martijn

On Thu, Dec 22, 2022 at 11:37 AM Marjan Jordanovski <mjmarjan1...@gmail.com>
wrote:

> Hello,
>
> I am using custom made connector to create Source table in this way:
>
>     create table Source (
>         ts TIMESTAMP(3),
>         instance STRING,
>         sservice STRING,
>         logdatetime STRING,
>         threadid STRING,
>         level STRING,
>         log_line STRING
>     ) with (
>         'connector'='lokiquery',
>         'host'='<lokiurl>',
>         'lokiqueryparamsstring'='query={instance="test",
> service="test"}&limit=5000&start=2022-12-15T16:40:09.560Z&end=2022-12-15T16:58:09.570Z'
>     );
>
> In this table I successfully store data from the specified time range from
> loki. Data is coming as a batch. (not stream)
>
> Then I want to create another table that will look for patterns in the
> log_line column from the Source table. I am doing following:
>
> SELECT *
> FROM Source
>     MATCH_RECOGNIZE (
>         ORDER BY ts
>         MEASURES
>             START_ROW.ts AS start_ts,
>             END_ROW.ts AS end_ts
>         ONE ROW PER MATCH
>         AFTER MATCH SKIP TO LAST END_ROW
>         PATTERN (START_ROW{1} UNK_ROW+? MID_ROW{2} END_ROW{1})
>         DEFINE
>             START_ROW AS START_ROW.log_line SIMILAR TO
> '%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%',
>             MID_ROW AS MID_ROW.log_line SIMILAR TO '%DSResponse -
> DSResponse: List with%',
>             END_ROW AS END_ROW.log_line SIMILAR TO '%ContentRepository%'
>     ) MR;
>
> And when using python's pyflink, this works just fine!
> But when I try the same thing in flink sql cli, I get strange error after
> executing second table:
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties: convention=LOGICAL,
> FlinkRelDistributionTraitDef=any, sort=[].
> Missing conversion is LogicalMatch[convention: NONE -> LOGICAL]
> There is 1 empty subset: rel#175:RelSubset#1.LOGICAL.any.[], the relevant
> part of the original plan is as follows
> 167:LogicalMatch(partition=[[]], order=[[0 ASC-nulls-first]],
> outputFields=[[start_ts, end_ts]], allRows=[false], after=[SKIP TO
> LAST(_UTF-16LE'END_ROW')],
> pattern=[(((PATTERN_QUANTIFIER(_UTF-16LE'START_ROW', 1, 1, false),
> PATTERN_QUANTIFIER(_UTF-16LE'UNK_ROW', 1, -1, true)),
> PATTERN_QUANTIFIER(_UTF-16LE'MID_ROW', 2, 2, false)),
> PATTERN_QUANTIFIER(_UTF-16LE'END_ROW', 1, 1, false))],
> isStrictStarts=[false], isStrictEnds=[false], subsets=[[]],
> patternDefinitions=[[SIMILAR TO(PREV(START_ROW.$6, 0),
> _UTF-16LE'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%'),
> SIMILAR TO(PREV(MID_ROW.$6, 0), _UTF-16LE'%DSResponse - DSResponse: List
> with%'), SIMILAR TO(PREV(END_ROW.$6, 0), _UTF-16LE'%ContentRepository%')]],
> inputFields=[[ts, instance, service, logdatetime, threadid, level,
> log_line]])
>   1:LogicalTableScan(subset=[rel#166:RelSubset#0.NONE.any.[]],
> table=[[default_catalog, default_database, Source]])
>
> In python, where this works, these are only configs that I use for table
> environment (of course I also include jar for my custom connector) :
> env_settings = EnvironmentSettings.in_batch_mode()
> t_env = TableEnvironment.create(env_settings)
> t_env.get_config().get_configuration().set_string("parallelism.default",
> "1")
>
> Therefore I set these values in flink sql table:
> SET 'execution.runtime-mode' = 'batch';
> SET 'parallelism.default' = '1';
>
> But it didn't help. Does anyone have any idea what could be causing this
> issue?
>
> Thank you,
> Marjan
>

Reply via email to