Hi, This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3 and release-1.12.0 have not been released yet (VOTE has been passed). I run your job in release-1.12, and the plan is correct.
[1] https://issues.apache.org/jira/browse/FLINK-19675 Best, Xingbo László Ciople <ciople.las...@gmail.com> 于2020年12月8日周二 下午5:21写道: > Hello, > I am trying to use Flink v1.11.2 with Python and the Table API to read and > write back messages to kafka topics. I am trying to filter messages based > on the output of a udf which returns a boolean. It seems that Flink ignores > the WHERE clause in my queries and every input message is received in the > output topic. > The input table is declared in SQL: > > --sql > CREATE TABLE teams_event ( > `payload` ROW( > `createdDateTime` STRING, > `body` ROW( > `content` STRING > ), > `from` ROW( > `user` ROW( > `displayName` STRING > ) > ), > `channelIdentity` ROW( > `channelId` STRING > ) > ) > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'xdr.ms_teams2.events.messages', > 'properties.bootstrap.servers' = > 'senso-kafka.solexdc01.bitdefender.biz:29030', > 'properties.group.id' = 'teams_profanity_filter', > 'format' = 'json', > 'scan.startup.mode' = 'earliest-offset', > 'json.fail-on-missing-field' = 'false', > 'json.timestamp-format.standard' = 'ISO-8601' > ) > """ > > The output table is also declared in sql: > > --sql > CREATE TABLE teams_profanity_event ( > `createdAt` STRING, > `censoredMessage` STRING, > `username` STRING, > `channelId` STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'internal.alerts.teams.rude_employees2', > 'properties.bootstrap.servers' = > 'senso-kafka.solexdc01.bitdefender.biz:29030', > 'format' = 'json' > ) > > I have declared two udfs and registered them in the table environment > > @udf(input_types=[ > DataTypes.ROW([ > DataTypes.FIELD("createdDateTime", DataTypes.STRING()), > DataTypes.FIELD("body", DataTypes.ROW([ > DataTypes.FIELD("content", DataTypes.STRING()) > ])), > DataTypes.FIELD("from", DataTypes.ROW([ > DataTypes.FIELD("user", DataTypes.ROW([ > DataTypes.FIELD("displayName", DataTypes.STRING()) > ])) > ])), > DataTypes.FIELD("channelIdentity", DataTypes.ROW([ > DataTypes.FIELD("channelId", DataTypes.STRING()) > ])) > ])], > result_type=DataTypes.BOOLEAN()) > def contains_profanity(payload): > message_content = payload[1][0] > found_profanity = profanity.contains_profanity(message_content) > logger.info(f'Message "{message_content}" contains profanity: > {found_profanity}') > return found_profanity > > > @udf(input_types=[ > DataTypes.ROW([ > DataTypes.FIELD("createdDateTime", DataTypes.STRING()), > DataTypes.FIELD("body", DataTypes.ROW([ > DataTypes.FIELD("content", DataTypes.STRING()) > ])), > DataTypes.FIELD("from", DataTypes.ROW([ > DataTypes.FIELD("user", DataTypes.ROW([ > DataTypes.FIELD("displayName", DataTypes.STRING()) > ])) > ])), > DataTypes.FIELD("channelIdentity", DataTypes.ROW([ > DataTypes.FIELD("channelId", DataTypes.STRING()) > ])) > ])], > result_type=DataTypes.STRING()) > def censor_profanity(payload): > message_content = payload[1][0] > censored_message = profanity.censor(message_content) > logger.info(f'Censored message: "{censored_message}"') > return censored_message > > The filtering of the messages and insertion into the sink is declared with > SQL: > > --sql > INSERT INTO teams_profanity_event ( > SELECT `payload`.`createdDateTime`, > censor_profanity(`payload`), > `payload`.`from`.`user`.`displayName`, > `payload`.`channelIdentity`.`channelId` > FROM teams_event > WHERE contains_profanity(`payload`) > ) > > Am I doing something wrong? It seems that the contains_profanity udf is > not used in the pipeline: > [image: image.png] > Thank you in advance! >