I generate a simple pricing as follows: echo "${UUID}:${TICKER}" >> ${IN_FILE}
UUID is the key and ticker is the name of security like “IBM” This is what I have in my shell script cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh \ --broker-list rhes75:9092,rhes75:9093,rhes75:9094,rhes564:9092,rhes564:9093,rhes564:9094,rhes76:9092,rhes76:9093,rhes76:9094 \ --topic md \ --property "parse.key=true" \ --property "key.separator=:" And this is what I get ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes75:2181,rhes564:2181,rhes76:2181 --topic md --property print.key=true 6af48cd4-c4ce-44a0-adb7-8d980b2b79a0 IBM 8436ac7f-d2ae-4762-bf4e-98242241290b IBM 96bbb149-b11c-440d-a82c-f78e53edda49 IBM 5fa8682b-d81a-466a-abbd-ca80793405b1 IBM a998446a-497f-4ac7-a5a7-21a524cfc4da IBM 7920affe-6641-48ef-9e08-5a4b9f66da4d IBM Now I want to post this data into aerospike My files are as follows: *cat connect-standalone.properties* # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=rhes75:9092, rhes564:9092,rhes76:9092 # The converters specify the format of data in Kafka and how to translate it # into Connect data. Every Connect user will need to configure these based on # the format they want their data in when loaded from or stored into Kafka # key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter ##alue.converter=org.apache.kafka.connect.json.JsonConverter # # Converter-specific settings can be passed in by prefixing the Converter's # setting with the converter we want to apply it to key.converter.schemas.enable=true value.converter.schemas.enable=false # The internal converter used for offsets and config data is configurable and # must be specified, but most users will always want to use the built-in # default. Offset and config data is never visible outside of Kafka Connect in # this format. ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter ##internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.storage.StringConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class # loading isolation for plugins (connectors, converters, transformations). The # list should consist of top level directories that include any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of # classes of plugins and their dependencies Note: symlinks will be followed to # discover dependencies or plugins. # Examples: plugin.path=/opt/aerospike-kafka-connect-sink/share/kafka And my sink-properies file is *cat aerospike-sink.properties* ## # Copyright 2016 Aerospike, Inc. # # Portions may be licensed to Aerospike, Inc. under one or more contributor # license agreements WHICH ARE COMPATIBLE WITH THE APACHE LICENSE, VERSION 2.0. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. ## name=aerospike-sink connector.class=com.aerospike.kafka.connect.sink.AerospikeSinkConnector tasks.max=1 topics=md cluster.hosts=rhes75:3000,rhes76:3000 policy.record_exists_action=replace topic.namespace=trading topic.set=MARKETDATAAEROSPIKEBATCH feature_key.path=/etc/aerospike/features.conf topic.key_field=key topic.bins=ticker ##topic.bins=ticker:ticker,timeissued:timeissued,price:price aerospike.username=trading_user_RW aerospike.password=xxxxxxx But I get this error when I do $KAFKA_HOME/bin/connect-standalone.sh etc/connect-standalone.properties etc/aerospike-sink.properties [2019-06-14 20:52:24,585] INFO ConnectorConfig values: aerospike.password = [hidden] aerospike.username = trading_user_RW cluster.hosts = rhes75:3000,rhes76:3000 feature_key.path = /etc/aerospike/features.conf policy.record_exists_action = replace topics = [md] (com.aerospike.kafka.connect.sink.ConnectorConfig:279) [2019-06-14 20:52:24,586] INFO TopicConfig values: bins = ticker key_field = key namespace = trading set = MARKETDATAAEROSPIKEBATCH set_field = null (com.aerospike.kafka.connect.sink.TopicConfig:279) [2019-06-14 20:52:24,723] INFO Connected to Aerospike cluster at rhes75 3000 (com.aerospike.kafka.connect.sink.AsyncWriter:73) [2019-06-14 20:52:24,724] INFO WorkerSinkTask{id=aerospike-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:282) [2019-06-14 20:52:24,730] INFO Cluster ID: gPy8TvkhSsCijj3A8m2kzw (org.apache.kafka.clients.Metadata:265) [2019-06-14 20:52:24,730] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Discovered group coordinator rhes564:9093 (id: 2147483642 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:605) [2019-06-14 20:52:24,731] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:411) [2019-06-14 20:52:24,732] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442) [2019-06-14 20:52:27,742] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409) [2019-06-14 20:52:27,743] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Setting newly assigned partitions [md-8, md-7, md-6, md-5, md-4, md-3, md-2, md-1, md-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256) [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-3 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-4 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-5 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,753] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-2 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-6 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-7 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-8 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,754] INFO [Consumer clientId=consumer-1, groupId=connect-aerospike-sink] Resetting offset for partition md-1 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:561) [2019-06-14 20:52:27,774] ERROR WorkerSinkTask{id=aerospike-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:544) *org.apache.kafka.connect.errors.DataException: No mapper for records of type STRING* at com.aerospike.kafka.connect.data.RecordMapperFactory.createMapper(RecordMapperFactory.java:58) at com.aerospike.kafka.connect.data.RecordMapperFactory.getMapper(RecordMapperFactory.java:45) at com.aerospike.kafka.connect.sink.AerospikeSinkTask.put(AerospikeSinkTask.java:58) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) I have run out of ideas what is wrong here. Appreciate any hint please. Regards, Mich *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.