Hi Pere,

I am using 3.3.0. By the way, I could resolve this issue by setting the 
“timezone” property on the connector. It is a bit strange as I thought that 
value is used only by Time based partitioner(which could have defaulted to UTC) 
and not a requirement for scheduled rotation. Not sure if it is a defect or a 
genuine requirement.

Thanks,
-Manu

From: Pere Urbón Bayes <pere.ur...@gmail.com>
Sent: Monday, February 04, 2019 11:41 PM
To: Manu Jacob <manu.ja...@sas.com>
Cc: users@kafka.apache.org
Subject: Re: Kafka connect FieldPartitioner with scheduled rotation


EXTERNAL
Hi Manu,
  if we take master of the connector, I forgot to ask you witch version are you 
using, your error looks to me coming from:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193

This line is basically:

for (TopicPartition tp : assignment) {
topicPartitionWriters.get(tp).write();
}

it could as well be coming from: 
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/3.2.x/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193

this is more reasonable according to your exception

this shows as well the same sort of problem.

it raises a null pointer exception, this makes me thing there is some problem 
with the partition writers

My guess with this in mind is that the error is not coming from the 
partitioner, but from something with the writers.

I hope this helps,

-- Pere

Missatge de Manu Jacob <manu.ja...@sas.com<mailto:manu.ja...@sas.com>> del dia 
dg., 3 de febr. 2019 a les 19:09:
Hi Pere,

Following is my configuration. In this test, I want to flush 1000 records 
and/or 5 minutes.


{"connector.class":"io.confluent.connect.s3.S3SinkConnector","s3.region":"us-east-1","topics.dir":"test_topics","flush.size":"1000","schema.compatibility":"NONE","topics":"sinmaj-test","tasks.max":"1","s3.part.size":"5242880","locale":"en","format.class":"io.confluent.connect.s3.format.json.JsonFormat","task.class":"io.confluent.connect.s3.S3SinkTask","partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","name":"s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","s3.bucket.name<http://s3.bucket.name>":"test-bucket","rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>":"300000","partition.field.name<http://partition.field.name>":"externalTenantId",
 "timestamp.extractor":"Wallclock"}

I get the following error. I am using version 3.3.0

2019-02-03 17:59:01,261] ERROR Commit of WorkerSinkTask{id=s3-test-0} offsets 
threw an unexpected exception:  
(org.apache.kafka.connect.runtime.WorkerSinkTask:205)
java.lang.NullPointerException
        at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:193)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-03 17:59:01,262] ERROR Task s3-test-0 threw an uncaught and 
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.NullPointerException
        at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:206)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:323)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
        at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-02-03 17:59:01,262] ERROR Task is being killed and will not recover until 
manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)

Thanks,
-Manu

-----Original Message-----
From: Pere Urbón Bayes <pere.ur...@gmail.com<mailto:pere.ur...@gmail.com>>
Sent: Sunday, February 03, 2019 12:50 PM
To: users@kafka.apache.org<mailto:users@kafka.apache.org>
Subject: Re: Kafka connect FieldPartitioner with scheduled rotation

EXTERNAL

Hi Manu,
  can you share your s3 connector config as well the exception you are getting? 
with only this info, I do need more details to understand your issue. Keep in 
mind the option you are using 
"rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>"
from the docs says:

> This configuration is useful when you have to commit your data based
> on
current server time, for example at the beginning of every hour. The default 
value -1 means that this feature is disabled.

Cheers

Missatge de Manu Jacob <manu.ja...@sas.com<mailto:manu.ja...@sas.com>> del dia 
dg., 3 de febr. 2019 a les 16:47:

> Hi,
>
> I want to use s3 connect by portioning with FieldPartitioner and
> partition.field.name<http://partition.field.name> set to a non timestamp 
> based field. I want to
> commit and flush based on  both size and time. I am getting an
> exception when I use the option 
> "rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>". Is it
> possible to rotate it with FieldPartitioner? I tried to set the
> timestamp.extractor (with record and wallclock) but it looks like it
> is honored only for time based partitioner.
>
> -Manu
>
>
>

--
Pere Urbon-Bayes
Software Architect
http://www.purbon.com
https://twitter.com/purbon
https://www.linkedin.com/in/purbon/


--
Pere Urbon-Bayes
Software Architect
http://www.purbon.com<http://www.purbon.com/>
https://twitter.com/purbon
https://www.linkedin.com/in/purbon/

Reply via email to