Awesome, thanks! On Tue, Dec 8, 2020 at 11:55 AM Xingbo Huang <hxbks...@gmail.com> wrote:
> Hi, > > This problem has been fixed[1] in 1.12.0,1.10.3,1.11.3, but release-1.11.3 > and release-1.12.0 have not been released yet (VOTE has been passed). I run > your job in release-1.12, and the plan is correct. > > > [1] https://issues.apache.org/jira/browse/FLINK-19675 > > Best, > Xingbo > > László Ciople <ciople.las...@gmail.com> 于2020年12月8日周二 下午5:21写道: > >> Hello, >> I am trying to use Flink v1.11.2 with Python and the Table API to read >> and write back messages to kafka topics. I am trying to filter messages >> based on the output of a udf which returns a boolean. It seems that Flink >> ignores the WHERE clause in my queries and every input message is received >> in the output topic. >> The input table is declared in SQL: >> >> --sql >> CREATE TABLE teams_event ( >> `payload` ROW( >> `createdDateTime` STRING, >> `body` ROW( >> `content` STRING >> ), >> `from` ROW( >> `user` ROW( >> `displayName` STRING >> ) >> ), >> `channelIdentity` ROW( >> `channelId` STRING >> ) >> ) >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'xdr.ms_teams2.events.messages', >> 'properties.bootstrap.servers' = >> 'senso-kafka.solexdc01.bitdefender.biz:29030', >> 'properties.group.id' = 'teams_profanity_filter', >> 'format' = 'json', >> 'scan.startup.mode' = 'earliest-offset', >> 'json.fail-on-missing-field' = 'false', >> 'json.timestamp-format.standard' = 'ISO-8601' >> ) >> """ >> >> The output table is also declared in sql: >> >> --sql >> CREATE TABLE teams_profanity_event ( >> `createdAt` STRING, >> `censoredMessage` STRING, >> `username` STRING, >> `channelId` STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'internal.alerts.teams.rude_employees2', >> 'properties.bootstrap.servers' = >> 'senso-kafka.solexdc01.bitdefender.biz:29030', >> 'format' = 'json' >> ) >> >> I have declared two udfs and registered them in the table environment >> >> @udf(input_types=[ >> DataTypes.ROW([ >> DataTypes.FIELD("createdDateTime", DataTypes.STRING()), >> DataTypes.FIELD("body", DataTypes.ROW([ >> DataTypes.FIELD("content", DataTypes.STRING()) >> ])), >> DataTypes.FIELD("from", DataTypes.ROW([ >> DataTypes.FIELD("user", DataTypes.ROW([ >> DataTypes.FIELD("displayName", DataTypes.STRING()) >> ])) >> ])), >> DataTypes.FIELD("channelIdentity", DataTypes.ROW([ >> DataTypes.FIELD("channelId", DataTypes.STRING()) >> ])) >> ])], >> result_type=DataTypes.BOOLEAN()) >> def contains_profanity(payload): >> message_content = payload[1][0] >> found_profanity = profanity.contains_profanity(message_content) >> logger.info(f'Message "{message_content}" contains profanity: >> {found_profanity}') >> return found_profanity >> >> >> @udf(input_types=[ >> DataTypes.ROW([ >> DataTypes.FIELD("createdDateTime", DataTypes.STRING()), >> DataTypes.FIELD("body", DataTypes.ROW([ >> DataTypes.FIELD("content", DataTypes.STRING()) >> ])), >> DataTypes.FIELD("from", DataTypes.ROW([ >> DataTypes.FIELD("user", DataTypes.ROW([ >> DataTypes.FIELD("displayName", DataTypes.STRING()) >> ])) >> ])), >> DataTypes.FIELD("channelIdentity", DataTypes.ROW([ >> DataTypes.FIELD("channelId", DataTypes.STRING()) >> ])) >> ])], >> result_type=DataTypes.STRING()) >> def censor_profanity(payload): >> message_content = payload[1][0] >> censored_message = profanity.censor(message_content) >> logger.info(f'Censored message: "{censored_message}"') >> return censored_message >> >> The filtering of the messages and insertion into the sink is declared >> with SQL: >> >> --sql >> INSERT INTO teams_profanity_event ( >> SELECT `payload`.`createdDateTime`, >> censor_profanity(`payload`), >> `payload`.`from`.`user`.`displayName`, >> `payload`.`channelIdentity`.`channelId` >> FROM teams_event >> WHERE contains_profanity(`payload`) >> ) >> >> Am I doing something wrong? It seems that the contains_profanity udf is >> not used in the pipeline: >> [image: image.png] >> Thank you in advance! >> >