hackergin commented on a change in pull request #18058: URL: https://github.com/apache/flink/pull/18058#discussion_r772471170
########## File path: flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkBaseITCase.java ########## @@ -302,6 +306,58 @@ public void testWritingDocumentsWithDynamicIndex() throws Exception { Assertions.assertEquals(response, expectedMap); } + @Test + public void testWritingDocumentsWithDynamicIndexFromProcTime() throws Exception { + TableEnvironment tableEnvironment = + TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + tableEnvironment + .getConfig() + .getConfiguration() + .setString("table.local-time-zone", "Asia/Shanghai"); + + String dynamicIndex1 = + "dynamic-index-" + + dateTimeFormatter.format(LocalDateTime.now(ZoneId.of("Asia/Shanghai"))); + String index = "dynamic-index-{now()|yyyy-MM-dd}"; Review comment: > It looks good but I have some more generic comments/questions: > > * We are using `now` from elasticsearch but we create the concrete index names ourselves using our utilities, instead of letting elasticsearch do that using it's system time, have you considered that? If we use our own parsing, date formatting, etc. if something changes in the future regarding elasticsearch index patterns we need to accommodate those changes in our code in order to be able to create the concrete index name. > * Maybe we should consider using a metadata column, were the user can define the expression that gives the proctime, to offer more flexibility instead of using the `LocalDateTime.now()` in the connector. > * Maybe we should extend this flexibility so that the whole index pattern (prefix + date pattern + suffix) is defined by the user using this metadata column, so that the same sink table can be used by different SQL insert statements, inserting into different index based on the metadata column without the need of creating multiple sink tables with SQL, each one with a more static index name. > * For all those cases, including the current implementation of the PR, we cannot support retractions for this sink, as a record can be inserted into for example `my-index-2021-12-20` but a later delete/update could reference the next day's index (`my-index-2021-12-21`) and cause issues. Thanks for the review and the conserce about this feature. To be honest , I didn't considered this deeply before. Currently, the elasticsearch-connector manage the creation of elasticsearch index. User should make sure the index be created correctly before running the flink job. The index pattern is used to generate the right index name. So I don't think we should keep the change with elasticsearch bahivior if I also agree adding metadata column support, so that users can generate dynamic index with a flexibility. I prefer to keep this change and open a new ticket to add supporting metadata column. cc @wuchong -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org