Hi Manu,
  if we take master of the connector, I forgot to ask you witch version are
you using, your error looks to me coming from:

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193

This line is basically:

for (TopicPartition tp : assignment) {
topicPartitionWriters.get(tp).write();
}

it could as well be coming from:
https://github.com/confluentinc/kafka-connect-storage-cloud/blob/3.2.x/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193

this is more reasonable according to your exception

this shows as well the same sort of problem.

it raises a null pointer exception, this makes me thing there is some
problem with the partition writers

My guess with this in mind is that the error is not coming from the
partitioner, but from something with the writers.

I hope this helps,

-- Pere

Missatge de Manu Jacob <manu.ja...@sas.com> del dia dg., 3 de febr. 2019 a
les 19:09:

> Hi Pere,
>
> Following is my configuration. In this test, I want to flush 1000 records
> and/or 5 minutes.
>
>
>
> {"connector.class":"io.confluent.connect.s3.S3SinkConnector","s3.region":"us-east-1","topics.dir":"test_topics","flush.size":"1000","schema.compatibility":"NONE","topics":"sinmaj-test","tasks.max":"1","s3.part.size":"5242880","locale":"en","format.class":"io.confluent.connect.s3.format.json.JsonFormat","task.class":"io.confluent.connect.s3.S3SinkTask","partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","name":"s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","
> s3.bucket.name":"test-bucket","rotate.schedule.interval.ms":"300000","
> partition.field.name":"externalTenantId",
> "timestamp.extractor":"Wallclock"}
>
> I get the following error. I am using version 3.3.0
>
> 2019-02-03 17:59:01,261] ERROR Commit of WorkerSinkTask{id=s3-test-0}
> offsets threw an unexpected exception:
> (org.apache.kafka.connect.runtime.WorkerSinkTask:205)
> java.lang.NullPointerException
>         at
> io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:193)
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2019-02-03 17:59:01,262] ERROR Task s3-test-0 threw an uncaught and
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
> java.lang.NullPointerException
>         at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:206)
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:323)
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486)
>         at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2019-02-03 17:59:01,262] ERROR Task is being killed and will not recover
> until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
>
> Thanks,
> -Manu
>
> -----Original Message-----
> From: Pere Urbón Bayes <pere.ur...@gmail.com>
> Sent: Sunday, February 03, 2019 12:50 PM
> To: users@kafka.apache.org
> Subject: Re: Kafka connect FieldPartitioner with scheduled rotation
>
> EXTERNAL
>
> Hi Manu,
>   can you share your s3 connector config as well the exception you are
> getting? with only this info, I do need more details to understand your
> issue. Keep in mind the option you are using "rotate.schedule.interval.ms"
> from the docs says:
>
> > This configuration is useful when you have to commit your data based
> > on
> current server time, for example at the beginning of every hour. The
> default value -1 means that this feature is disabled.
>
> Cheers
>
> Missatge de Manu Jacob <manu.ja...@sas.com> del dia dg., 3 de febr. 2019
> a les 16:47:
>
> > Hi,
> >
> > I want to use s3 connect by portioning with FieldPartitioner and
> > partition.field.name set to a non timestamp based field. I want to
> > commit and flush based on  both size and time. I am getting an
> > exception when I use the option "rotate.schedule.interval.ms". Is it
> > possible to rotate it with FieldPartitioner? I tried to set the
> > timestamp.extractor (with record and wallclock) but it looks like it
> > is honored only for time based partitioner.
> >
> > -Manu
> >
> >
> >
>
> --
> Pere Urbon-Bayes
> Software Architect
> http://www.purbon.com
> https://twitter.com/purbon
> https://www.linkedin.com/in/purbon/
>


-- 
Pere Urbon-Bayes
Software Architect
http://www.purbon.com
https://twitter.com/purbon
https://www.linkedin.com/in/purbon/

Reply via email to