iduanyingjie created FLINK-30809: ------------------------------------ Summary: flink-connector-elasticsearch7 updates data pipeline does not work Key: FLINK-30809 URL: https://issues.apache.org/jira/browse/FLINK-30809 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: elasticsearch-3.0.0 Environment: Flink Version: 1.15.3 Mysql Version: 5.7 Elasticsearch Version: 7.17.7 Reporter: iduanyingjie
create elasticsearch in docker {code:yaml} version: '2.1' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7 environment: - cluster.name=docker-cluster - bootstrap.memory_lock=true - "ES_JAVA_OPTS=-Xms512m -Xmx512m" - discovery.type=single-node ports: - "9200:9200" - "9300:9300" ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 kibana: image: docker.elastic.co/kibana/kibana:7.17.7 ports: - "5601:5601" {code} create table: records in mysql {code:sql} CREATE TABLE records ( id bigint unsigned NOT NULL AUTO_INCREMENT, user_id bigint unsigned NOT NULL, create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; {code} insert some datas {code:sql} INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, '2023-01-20 12:25:11'); INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, '2023-01-20 12:25:30'); INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, '2023-01-20 12:25:37'); {code} create pipeline in es: {code:java} PUT /_ingest/pipeline/set_ingest_timestamp_fields { "processors": [ { "set": { "field": "ingest_timestamp", "value": "{{_ingest.timestamp}}" } } ] }{code} create index in es: {code:java} PUT enriched_records { "settings": { "default_pipeline": "set_ingest_timestamp_fields", "number_of_shards": "1", "number_of_replicas": "0" } }{code} excute flink sql: {code:sql} CREATE TABLE records ( id INT, user_id INT, create_time TIMESTAMP(3), proc_time AS PROCTIME(), operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'test', 'table-name' = 'records', 'server-time-zone' = 'UTC' ); CREATE TABLE enriched_records ( id INT, user_id INT, create_time TIMESTAMP(3), proc_time TIMESTAMP_LTZ(3), operation_time TIMESTAMP_LTZ(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'enriched_records' ); INSERT INTO enriched_records SELECT o.id, o.user_id, o.create_time, o.proc_time, o.operation_time FROM records AS o; {code} We query the data in Elasticsearch use GET /enriched_records/_search and we find that each record has an ingest_timestamp field and the value is the recent time. {code:json} { "_index":"enriched_records", "_type":"_doc", "_id":"3", "_score":1, "_source":{ "operation_time":"1970-01-01 00:00:00Z", "create_time":"2023-01-20 12:25:37", "user_id":789, "ingest_timestamp":"2023-01-28T05:21:40.539754251Z", "id":3, "proc_time":"2023-01-28 05:21:40.233Z" } } {code} When we modify a record in MySQL, the value of the ingest_timestamp field does not change, and it seems that the pipeline set for this index is not working at this moment. {code:json} { "_index":"enriched_records", "_type":"_doc", "_id":"3", "_score":1, "_source":{ "operation_time":"2023-01-28 05:25:05Z", "create_time":"2023-01-20 12:25:37", "user_id":987, "ingest_timestamp":"2023-01-28T05:21:40.539754251Z", "id":3, "proc_time":"2023-01-28 05:25:05.529Z" } } {code} If we directly modify a field in Elasticsearch, we can find that the value of the ingest_timestamp field will change. -- This message was sent by Atlassian Jira (v8.20.10#820010)