Hi,
I am new to Kafka and since last 1 week i am trying to ingest data from
Kafka to my PostgreSQL DB but somehow i couldn't get success. Basically
my requirement is like i am already receiving some IOT data in form of a
JSON and pushing them to Kafka. Till this everything is fine. But as an
extension, my next job is to insert same JSON data into PostgreSQL DB.
Now regarding this i gone through several videos and tutorials and tried
to implement accordingly but i couldn't get success to insert that JSON
data to my PostgreSQL DB. That's the reason why i am writing this email
to Dev community and seeking help in that.
For your reference i am providing the details what currently i am doing.
1. I have windows environment. so i downloaded Kafka binaries from
Apache site and configured on my local PC. (Referred video -
https://www.youtube.com/watch?v=3XjfYH5Z0f0)
2. After Kafka setup, I am able to do basic tests like running
Zookeeper, Kafka Server, Created topic, Started consumer and then
posting JSON data from producer.
3. Now to do JSON data ingestion in PostgreSQL DB(PostgreSQL Db is
already setup-d on my local PC), As i found in one of the article, i
downloaded JDBC Sink connector from Confluent
site(https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc)
and manually
configured(https://docs.confluent.io/kafka-connect-jdbc/current/index.html)
as per documentation(Just to clarify, I am not using Confluent
distribution).
4. Then i did changes in the configuration files i.e.
connect-standalone.properties and connect-file-sink.properties(I am
attaching both files here for your reference).
5. After this i ran JDBC sink connector using command
(.\bin\windows\connect-standalone.bat
.\config\connect-standalone.properties
.\config\connect-file-sink.properties).
6. After this when i am producing JSON data from Kafka Producer, JDBC
connector is receiving that data(As it is logging on the JDBC
console) but that data is not being insert in PostgreSQL DB table.
7. Please note i am not using any Schema registry here and that's why i
set key.converter.schemas.enable=false and
value.converter.schemas.enable=false in connect-standalone.properties .
Let me know where i am doing wrong.
Please help me to solve this as this is very important for me and i am
struggling in this since last 2-3 days. If you guys could provide me any
live example for same requirement then that would be great.
Looking forward
Thanks
Avinash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=students-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=students
connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=root
insert.mode=insert
table.name.format=students
pk.mode=none
pk.fields=none
auto.create=false
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it
into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when
loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's
setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class
loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top
level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of
classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
#
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=C:\kafka\plugins\lib