I'm not quite familiar with ES conector. However, I guess you could check
if there is data going into the sink connector. One way to achieve this is
to set the pipeline.operator-chaining as false and then you could see the
count of input elements for the sink operator.

PS: Just removed the community user mailing list.

On Wed, Apr 6, 2022 at 3:30 PM Muhammad Umar Gulzar <umargulzar...@gmail.com>
wrote:

> Thank you for the prompt reply.
> It did work and I'm not getting any errors now.
>
> But nothing is happening now.
> Even if I change the host IP address or enter the wrong username and
> password nothing happens. It gives no error message.
>
> On Wed, Apr 6, 2022 at 11:54 AM Dian Fu <dian0511...@gmail.com> wrote:
>
> > I think you should use [1] or [2] instead. See [3] for more details.
> >
> > PS: This question is more fit for the user mailing list.
> >
> > Regards,
> > Dian
> >
> > [1]
> >
> >
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.14.4/flink-sql-connector-elasticsearch6_2.11-1.14.4.jar
> > [2]
> >
> >
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.14.4/flink-sql-connector-elasticsearch7_2.11-1.14.4.jar
> > [3]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/elasticsearch/
> >
> >
> > On Wed, Apr 6, 2022 at 2:43 PM Muhammad Umar Gulzar <
> > umargulzar...@gmail.com>
> > wrote:
> >
> > > I’m using Python 3.7, Apache-Flink 1.14 and want to sink to
> > ElasticSearch.
> > >
> > > I have added the jar using this code:
> > >
> > > table_env.get_config().get_configuration().set_string('pipeline.jars',
> > >
> > >
> >
> 'file:///C:/Users/me/PycharmProjects/flink/Lib/flink-connector-elasticsearch-base_2.12-1.14.4.jar')
> > >
> > >
> > >
> > > Here is my sink code
> > >
> > > table_env.execute_sql("""
> > >     CREATE TABLE `Result` (
> > >         date_received DATE,
> > >         product_name STRING,
> > >         issue STRING,
> > >         zip_code INT
> > >     ) WITH (
> > >         'connector' = 'elasticsearch-7',
> > >         'hosts' = ‘host_url',
> > >         'index' = 'index_name’,
> > >         'username' = dummy,
> > >         'password' =  '**********’
> > >     )
> > > """)
> > >
> > >
> > >
> > > But when I run the program, I get this error.
> > >
> > >
> > >
> > > *Could not find any factory for identifier 'elasticsearch-7' that
> > > implements 'org.apache.flink.table.factories.DynamicTableFactory' in
> the
> > > classpath.*
> > >
> > >
> > >
> > > *Available factory identifiers are:*
> > >
> > >
> > >
> > > *blackhole*
> > >
> > > *datagen*
> > >
> > > *filesystem*
> > >
> > > *print*
> > >
> >
>

Reply via email to