Hi!

I have recently started a project with following scenario:
My Kafka is receiving data from two sources (both String key, Byte[] value)
that i want to stream into two different Ignite caches. 
The entire setup is to be built with docker(compose). I will post my
docker-compose.yml at the end.

Now, let's start with the questions and issues:

1. After starting up the docker-compose I can access the Kafka
Control-Center (confluent) and it shows both topics being filled. With the
REST-call: 

{"name": "test-connector-one", 
 "config":       

{"connector.class":"org.apache.ignite.stream.kafka.connect.IgniteSinkConnector",
"tasks.max":"1",
"topics":"TestTopic",
"cacheName":"TestCache",
"igniteCfg":"/etc/kafka-connect/jars/test.xml"}}

I am starting to stream data from Kafkas TestTopic to Ignites TestCache - I
have set the text.xml so that IgniteSinkConnector will start a node in
client-mode only. So far everything works perfectly fine, I can see the data
arriving in TestCache (also used -scan in ignitevisorcmd, the data is
correct).

Now, if I do another REST-Call in order to start the second connector (same
call, different 'name', 'topics' and 'cacheName') the connector appears to
be stopping all workers from test-connector-one (rebalancing?) while
apparantly also stopping the local Ignite-Node (the one in client mode).
This results in neither connector-one nor connector-two to start, since they
both receive the error:

ERROR Task test-connector-one-0 threw an uncaught and unrecoverable
exception (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect                 | java.lang.IllegalStateException: Data streamer has
been closed.
connect                 |       at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:402)
connect                 |       at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addDataInternal(DataStreamerImpl.java:614)
connect                 |       at
org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:676)
connect                 |       at
org.apache.ignite.stream.kafka.connect.IgniteSinkTask.put(IgniteSinkTask.java:118)
connect                 |       at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
connect                 |       at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
connect                 |       at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
connect                 |       at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
connect                 |       at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
connect                 |       at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
connect                 |       at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect                 |       at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect                 |       at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
connect                 |       at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
connect                 |       at java.lang.Thread.run(Thread.java:745)
connect                 | [2017-11-20 20:09:50,801] ERROR Task is being
killed and will not recover until manually restarted
(org.apache.kafka.connect.runtime.WorkerSinkTask)


What's happening here? How is it 'supposed' to work? 
It's possible to consume both topics at once in one connector.

2. The next question is more general:
The byte array value part stores information that I want to extract and work
with in Ignite, I can deserialize it as a CustomObject. 
Usually, all or most of the data with the same key-String will be needed for
a task.
Are you 'supposed' to deserialize it during the Kafka Connect and actually
store the customObject in the cache?

3. I am having issues doing queries on the data in the cache. Let's say I
want to have all entries with the key (a String) "Test1":

Ignition.setClientMode(true);

        try (Ignite ignite = Ignition.start("test.xml")) {
            IgniteCache<String, byte[]> cache = ignite.cache("TestCache");

            ScanQuery<String, byte[]> scan = new ScanQuery<>(
(*)                    new IgniteBiPredicate<String, byte[]>() {
                        @Override public boolean apply(String key, byte[]
value) {
                            return key.equals("Test1");
                        }
                    }
            );

            Iterable<?> it = cache.query(scan).getAll();

            for (Object next : it){
                System.out.println(next);
            }
        }
 
which results in the error:

>Exception in thread "main" javax.cache.CacheException: class
org.apache.ignite.IgniteCheckedException: Query >execution failed: 

... alot of stacktrace that doesn't look too interesting (to me) .. 
until:

>Caused by: java.lang.ClassCastException: [B cannot be cast to
[Ljava.lang.Byte;

at the line i marked with (*)

The value is stored as byte[] as well, so I am not sure what I am doing
wrong.






I am really in need of some more knowledge about why that stuff is not
working, I was not very successfull at finding it anywhere. 
For question 1) I couldn't find a single up-to-date example, no clue how
you'd use the single/multipleTupleExtractor for instance (especially in
docker environment). Debugging and reading through thousands of loglines is
really tedious.
Question 2) is probably something I'm simply lacking experience in, would be
really helpful getting advice so I don't need to try all possibilities
For question 3 I might miss something obvious, couldn't figure it out for
quite some time now though.

I would be super happy if someone can give me more information!

Best regards,
Svonn



P.S: Here's my docker-compose.yml, I am using a volume to provide the config
files (test.xml) and .jars for the container:

---
version: '3.3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    container_name: zookeeper

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092'
      KAFKA_METRIC_REPORTERS:
io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    container_name: broker

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
    container_name: schemaregistry

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    depends_on:
      - zookeeper
      - broker
      - schema_registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER:
org.apache.kafka.connect.storage.StringConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL:
'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER:
org.apache.kafka.connect.converters.ByteArrayConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER:
org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    volumes:
      - /usr/local/connector:/etc/kafka-connect/jars
    container_name: connect


  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    depends_on:
      - zookeeper
      - broker
      - schema_registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    container_name: controlcenter

  test-data-producer:
    build:test-data-producer/
    depends_on:
    - broker
    - zookeeper
    command: (starts the producer)

  ignite:
    image: "apacheignite/ignite:2.3.0"
    environment:
      - "CONFIG_URI=/usr/local/connector/test.xml"
    volumes:
      - /usr/local/connector:/usr/local/connector
    container_name: ignite












--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to