iduanyingjie created FLINK-30809:
------------------------------------

             Summary: flink-connector-elasticsearch7 updates data pipeline does 
not work
                 Key: FLINK-30809
                 URL: https://issues.apache.org/jira/browse/FLINK-30809
             Project: Flink
          Issue Type: Bug
          Components: Connectors / ElasticSearch
    Affects Versions: elasticsearch-3.0.0
         Environment: Flink Version: 1.15.3
Mysql Version: 5.7
Elasticsearch Version: 7.17.7
            Reporter: iduanyingjie


create elasticsearch in docker
{code:yaml}
version: '2.1'
services:
 elasticsearch:
   image: docker.elastic.co/elasticsearch/elasticsearch:7.17.7
   environment:
     - cluster.name=docker-cluster
     - bootstrap.memory_lock=true
     - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
     - discovery.type=single-node
   ports:
     - "9200:9200"
     - "9300:9300"
   ulimits:
     memlock:
       soft: -1
       hard: -1
     nofile:
       soft: 65536
       hard: 65536
 kibana:
   image: docker.elastic.co/kibana/kibana:7.17.7
   ports:
     - "5601:5601"
{code}
create table: records in mysql
{code:sql}
CREATE TABLE records (
 id bigint unsigned NOT NULL AUTO_INCREMENT,
 user_id bigint unsigned NOT NULL,
 create_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
 PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
{code}
insert some datas
{code:sql}
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 123, 
'2023-01-20 12:25:11');
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 456, 
'2023-01-20 12:25:30');
INSERT INTO test.records (id, user_id, create_time) VALUES(default, 789, 
'2023-01-20 12:25:37');
{code}
create pipeline in es:
{code:java}
PUT /_ingest/pipeline/set_ingest_timestamp_fields
{
 "processors": [
   {
     "set": {
       "field": "ingest_timestamp",
       "value": "{{_ingest.timestamp}}"
     }
   }
 ]
}{code}
create index in es:
{code:java}
PUT enriched_records
{
 "settings": {
   "default_pipeline": "set_ingest_timestamp_fields",
   "number_of_shards": "1",
   "number_of_replicas": "0"
 }
}{code}
excute flink sql:
{code:sql}
CREATE TABLE records (
   id INT,
   user_id INT,
   create_time TIMESTAMP(3),
   proc_time AS PROCTIME(),
   operation_time TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'localhost',
   'port' = '3306',
   'username' = 'root',
   'password' = '123456',
   'database-name' = 'test',
   'table-name' = 'records',
   'server-time-zone' = 'UTC'
);
CREATE TABLE enriched_records (
   id INT,
   user_id INT,
   create_time TIMESTAMP(3),
   proc_time TIMESTAMP_LTZ(3),
   operation_time TIMESTAMP_LTZ(3),
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'elasticsearch-7',
   'hosts' = 'http://localhost:9200',
   'index' = 'enriched_records'
);
INSERT INTO enriched_records
SELECT
   o.id,
   o.user_id,
   o.create_time,
   o.proc_time,
   o.operation_time
FROM records AS o; 
{code}
We query the data in Elasticsearch use GET /enriched_records/_search and we 
find that each record has an ingest_timestamp field and the value is the recent 
time.
{code:json}
{
    "_index":"enriched_records",
    "_type":"_doc",
    "_id":"3",
    "_score":1,
    "_source":{
        "operation_time":"1970-01-01 00:00:00Z",
        "create_time":"2023-01-20 12:25:37",
        "user_id":789,
        "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
        "id":3,
        "proc_time":"2023-01-28 05:21:40.233Z"
    }
} {code}
When we modify a record in MySQL, the value of the ingest_timestamp field does 
not change, and it seems that the pipeline set for this index is not working at 
this moment.
{code:json}
{
    "_index":"enriched_records",
    "_type":"_doc",
    "_id":"3",
    "_score":1,
    "_source":{
        "operation_time":"2023-01-28 05:25:05Z",
        "create_time":"2023-01-20 12:25:37",
        "user_id":987,
        "ingest_timestamp":"2023-01-28T05:21:40.539754251Z",
        "id":3,
        "proc_time":"2023-01-28 05:25:05.529Z"
    }
}
{code}
If we directly modify a field in Elasticsearch, we can find that the value of 
the ingest_timestamp field will change.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to