Hi,

I am new to Kafka and since last 1 week i am trying to ingest data from Kafka to my PostgreSQL DB but somehow i couldn't get success. Basically my requirement is like i am already receiving some IOT data in form of a JSON and pushing them to Kafka. Till this everything is fine. But as an extension, my next job is to insert same JSON data into PostgreSQL DB.

Now regarding this i gone through several videos and tutorials and tried to implement accordingly but i couldn't get success to insert that JSON data to my PostgreSQL DB. That's the reason why i am writing this email to Dev community and seeking help in that.

For your reference i am providing the details what currently i am doing.

1. I have windows environment. so i downloaded Kafka binaries from
   Apache site and configured on my local PC. (Referred video -
   https://www.youtube.com/watch?v=3XjfYH5Z0f0)
2. After Kafka setup, I am able to do basic tests like running
   Zookeeper, Kafka Server, Created topic, Started consumer and then
   posting JSON data from producer.
3. Now to do JSON data ingestion in PostgreSQL DB(PostgreSQL Db is
   already setup-d on my local PC), As i found in one of the article, i
   downloaded JDBC Sink connector from Confluent
   site(https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc)
   and manually
   configured(https://docs.confluent.io/kafka-connect-jdbc/current/index.html)
   as per documentation(Just to clarify, I am not using Confluent
   distribution).
4. Then i did changes in the configuration files i.e.
   connect-standalone.properties and connect-file-sink.properties(I am
   attaching both files here for your reference).
5. After this i ran JDBC sink connector using command
   (.\bin\windows\connect-standalone.bat
   .\config\connect-standalone.properties
   .\config\connect-file-sink.properties).
6. After this when i am producing JSON data from Kafka Producer, JDBC
   connector is receiving that data(As it is logging on the JDBC
   console) but that data is not being insert in PostgreSQL DB table.
7. Please note i am not using any Schema registry here and that's why i
   set key.converter.schemas.enable=false and
   value.converter.schemas.enable=false in connect-standalone.properties .

Let me know where i am doing wrong.

Please help me to solve this as this is very important for me and i am struggling in this since last 2-3 days. If you guys could provide me any live example for same requirement then that would be great.

Looking forward


Thanks

Avinash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name=students-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=students
connection.url=jdbc:postgresql://localhost:5432/postgres
connection.user=postgres
connection.password=root
insert.mode=insert
table.name.format=students
pk.mode=none
pk.fields=none
auto.create=false
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it 
into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when 
loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's 
setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class 
loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top 
level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of 
classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# 
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=C:\kafka\plugins\lib

Reply via email to