python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。
代码如下,使用了MATCH_RECOGNIZE:
s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_settings =
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
st_env = StreamTableEnvironment.create(s_env,
environment_settings=b_s_settings)
configuration = st_env.get_config().get_configuration()
configuration.set_string("taskmanager.memory.task.off-heap.size",
"500m")
s_env.set_parallelism(1)
kafka_source = """CREATE TABLE source (
flow_name STRING,
flow_id STRING,
component STRING,
filename STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'cep',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)"""
postgres_sink = """
CREATE TABLE cep_result (
`filename` STRING,
`start_tstamp` TIMESTAMP(3),
`end_tstamp` TIMESTAMP(3)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres',
'connector.table' = 'cep_result',
'connector.driver' = 'org.postgresql.Driver',
'connector.username' = 'postgres',
'connector.password' = 'my_password',
'connector.write.flush.max-rows' = '1'
)
"""
st_env.sql_update(kafka_source)
st_env.sql_update(postgres_sink)
postgres_sink_sql = '''
INSERT INTO cep_result
SELECT *
FROM source
MATCH_RECOGNIZE (
PARTITION BY filename
ORDER BY event_time
MEASURES
(A.event_time) AS start_tstamp,
(D.event_time) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B C D)
DEFINE
A AS component = 'XXX',
B AS component = 'YYY',
C AS component = 'ZZZ',
D AS component = 'WWW'
) MR
'''
sql_result = st_env.execute_sql(postgres_sink_sql)
--
Sent from: http://apache-flink.147419.n8.nabble.com/