I’m using Python 3.7, Apache-Flink 1.14 and want to sink to ElasticSearch.

I have added the jar using this code:

table_env.get_config().get_configuration().set_string('pipeline.jars',
'file:///C:/Users/me/PycharmProjects/flink/Lib/flink-connector-elasticsearch-base_2.12-1.14.4.jar')



Here is my sink code

table_env.execute_sql("""
    CREATE TABLE `Result` (
        date_received DATE,
        product_name STRING,
        issue STRING,
        zip_code INT
    ) WITH (
        'connector' = 'elasticsearch-7',
        'hosts' = ‘host_url',
        'index' = 'index_name’,
        'username' = dummy,
        'password' =  '**********’
    )
""")



But when I run the program, I get this error.



*Could not find any factory for identifier 'elasticsearch-7' that
implements 'org.apache.flink.table.factories.DynamicTableFactory' in the
classpath.*



*Available factory identifiers are:*



*blackhole*

*datagen*

*filesystem*

*print*

Reply via email to