Hi!
I have recently started a project with following scenario:
My Kafka is receiving data from two sources (both String key, Byte[] value)
that i want to stream into two different Ignite caches.
The entire setup is to be built with docker(compose). I will post my
docker-compose.yml at the end.
Now, let's start with the questions and issues:
1. After starting up the docker-compose I can access the Kafka
Control-Center (confluent) and it shows both topics being filled. With the
REST-call:
{"name": "test-connector-one",
"config":
{"connector.class":"org.apache.ignite.stream.kafka.connect.IgniteSinkConnector",
"tasks.max":"1",
"topics":"TestTopic",
"cacheName":"TestCache",
"igniteCfg":"/etc/kafka-connect/jars/test.xml"}}
I am starting to stream data from Kafkas TestTopic to Ignites TestCache - I
have set the text.xml so that IgniteSinkConnector will start a node in
client-mode only. So far everything works perfectly fine, I can see the data
arriving in TestCache (also used -scan in ignitevisorcmd, the data is
correct).
Now, if I do another REST-Call in order to start the second connector (same
call, different 'name', 'topics' and 'cacheName') the connector appears to
be stopping all workers from test-connector-one (rebalancing?) while
apparantly also stopping the local Ignite-Node (the one in client mode).
This results in neither connector-one nor connector-two to start, since they
both receive the error:
ERROR Task test-connector-one-0 threw an uncaught and unrecoverable
exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect | java.lang.IllegalStateException: Data streamer has
been closed.
connect | at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:402)
connect | at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:614)
connect | at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:676)
connect | at
org.apache.ignite.stream.kafka.connect.IgniteSinkTask.put(IgniteSinkTask.java:118)
connect | at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
connect | at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
connect | at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
connect | at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
connect | at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
connect | at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
connect | at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect | at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect | at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect | at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect | at java.lang.Thread.run(Thread.java:745)
connect | [2017-11-20 20:09:50,801] ERROR Task is being
killed and will not recover until manually restarted
(org.apache.kafka.connect.runtime.WorkerSinkTask)
What's happening here? How is it 'supposed' to work?
It's possible to consume both topics at once in one connector.
2. The next question is more general:
The byte array value part stores information that I want to extract and work
with in Ignite, I can deserialize it as a CustomObject.
Usually, all or most of the data with the same key-String will be needed for
a task.
Are you 'supposed' to deserialize it during the Kafka Connect and actually
store the customObject in the cache?
3. I am having issues doing queries on the data in the cache. Let's say I
want to have all entries with the key (a String) "Test1":
Ignition.setClientMode(true);
try (Ignite ignite = Ignition.start("test.xml")) {
IgniteCache<String, byte[]> cache = ignite.cache("TestCache");
ScanQuery<String, byte[]> scan = new ScanQuery<>(
(*) new IgniteBiPredicate<String, byte[]>() {
@Override public boolean apply(String key, byte[]
value) {
return key.equals("Test1");
}
}
);
Iterable<?> it = cache.query(scan).getAll();
for (Object next : it){
System.out.println(next);
}
}
which results in the error:
>Exception in thread "main" javax.cache.CacheException: class
org.apache.ignite.IgniteCheckedException: Query >execution failed:
... alot of stacktrace that doesn't look too interesting (to me) ..
until:
>Caused by: java.lang.ClassCastException: [B cannot be cast to
[Ljava.lang.Byte;
at the line i marked with (*)
The value is stored as byte[] as well, so I am not sure what I am doing
wrong.
I am really in need of some more knowledge about why that stuff is not
working, I was not very successfull at finding it anywhere.
For question 1) I couldn't find a single up-to-date example, no clue how
you'd use the single/multipleTupleExtractor for instance (especially in
docker environment). Debugging and reading through thousands of loglines is
really tedious.
Question 2) is probably something I'm simply lacking experience in, would be
really helpful getting advice so I don't need to try all possibilities
For question 3 I might miss something obvious, couldn't figure it out for
quite some time now though.
I would be super happy if someone can give me more information!
Best regards,
Svonn
P.S: Here's my docker-compose.yml, I am using a volume to provide the config
files (test.xml) and .jars for the container:
---
version: '3.3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
container_name: zookeeper
broker:
image: confluentinc/cp-enterprise-kafka
hostname: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'
KAFKA_METRIC_REPORTERS:
io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
container_name: broker
schema_registry:
image: confluentinc/cp-schema-registry
hostname: schema_registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
container_name: schemaregistry
connect:
image: confluentinc/cp-kafka-connect
hostname: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER:
org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL:
'http://schema_registry:8081'
CONNECT_VALUE_CONVERTER:
org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
'http://schema_registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
volumes:
- /usr/local/connector:/etc/kafka-connect/jars
container_name: connect
control-center:
image: confluentinc/cp-enterprise-control-center
hostname: control-center
depends_on:
- zookeeper
- broker
- schema_registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
container_name: controlcenter
test-data-producer:
build:test-data-producer/
depends_on:
- broker
- zookeeper
command: (starts the producer)
ignite:
image: "apacheignite/ignite:2.3.0"
environment:
- "CONFIG_URI=/usr/local/connector/test.xml"
volumes:
- /usr/local/connector:/usr/local/connector
container_name: ignite
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/