I'm not quite familiar with ES conector. However, I guess you could check if there is data going into the sink connector. One way to achieve this is to set the pipeline.operator-chaining as false and then you could see the count of input elements for the sink operator.
PS: Just removed the community user mailing list. On Wed, Apr 6, 2022 at 3:30 PM Muhammad Umar Gulzar <umargulzar...@gmail.com> wrote: > Thank you for the prompt reply. > It did work and I'm not getting any errors now. > > But nothing is happening now. > Even if I change the host IP address or enter the wrong username and > password nothing happens. It gives no error message. > > On Wed, Apr 6, 2022 at 11:54 AM Dian Fu <dian0511...@gmail.com> wrote: > > > I think you should use [1] or [2] instead. See [3] for more details. > > > > PS: This question is more fit for the user mailing list. > > > > Regards, > > Dian > > > > [1] > > > > > https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.14.4/flink-sql-connector-elasticsearch6_2.11-1.14.4.jar > > [2] > > > > > https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.14.4/flink-sql-connector-elasticsearch7_2.11-1.14.4.jar > > [3] > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/elasticsearch/ > > > > > > On Wed, Apr 6, 2022 at 2:43 PM Muhammad Umar Gulzar < > > umargulzar...@gmail.com> > > wrote: > > > > > I’m using Python 3.7, Apache-Flink 1.14 and want to sink to > > ElasticSearch. > > > > > > I have added the jar using this code: > > > > > > table_env.get_config().get_configuration().set_string('pipeline.jars', > > > > > > > > > 'file:///C:/Users/me/PycharmProjects/flink/Lib/flink-connector-elasticsearch-base_2.12-1.14.4.jar') > > > > > > > > > > > > Here is my sink code > > > > > > table_env.execute_sql(""" > > > CREATE TABLE `Result` ( > > > date_received DATE, > > > product_name STRING, > > > issue STRING, > > > zip_code INT > > > ) WITH ( > > > 'connector' = 'elasticsearch-7', > > > 'hosts' = ‘host_url', > > > 'index' = 'index_name’, > > > 'username' = dummy, > > > 'password' = '**********’ > > > ) > > > """) > > > > > > > > > > > > But when I run the program, I get this error. > > > > > > > > > > > > *Could not find any factory for identifier 'elasticsearch-7' that > > > implements 'org.apache.flink.table.factories.DynamicTableFactory' in > the > > > classpath.* > > > > > > > > > > > > *Available factory identifiers are:* > > > > > > > > > > > > *blackhole* > > > > > > *datagen* > > > > > > *filesystem* > > > > > > *print* > > > > > >