Hi Pere, I am using 3.3.0. By the way, I could resolve this issue by setting the “timezone” property on the connector. It is a bit strange as I thought that value is used only by Time based partitioner(which could have defaulted to UTC) and not a requirement for scheduled rotation. Not sure if it is a defect or a genuine requirement.
Thanks, -Manu From: Pere Urbón Bayes <pere.ur...@gmail.com> Sent: Monday, February 04, 2019 11:41 PM To: Manu Jacob <manu.ja...@sas.com> Cc: users@kafka.apache.org Subject: Re: Kafka connect FieldPartitioner with scheduled rotation EXTERNAL Hi Manu, if we take master of the connector, I forgot to ask you witch version are you using, your error looks to me coming from: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193 This line is basically: for (TopicPartition tp : assignment) { topicPartitionWriters.get(tp).write(); } it could as well be coming from: https://github.com/confluentinc/kafka-connect-storage-cloud/blob/3.2.x/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkTask.java#L193 this is more reasonable according to your exception this shows as well the same sort of problem. it raises a null pointer exception, this makes me thing there is some problem with the partition writers My guess with this in mind is that the error is not coming from the partitioner, but from something with the writers. I hope this helps, -- Pere Missatge de Manu Jacob <manu.ja...@sas.com<mailto:manu.ja...@sas.com>> del dia dg., 3 de febr. 2019 a les 19:09: Hi Pere, Following is my configuration. In this test, I want to flush 1000 records and/or 5 minutes. {"connector.class":"io.confluent.connect.s3.S3SinkConnector","s3.region":"us-east-1","topics.dir":"test_topics","flush.size":"1000","schema.compatibility":"NONE","topics":"sinmaj-test","tasks.max":"1","s3.part.size":"5242880","locale":"en","format.class":"io.confluent.connect.s3.format.json.JsonFormat","task.class":"io.confluent.connect.s3.S3SinkTask","partitioner.class":"io.confluent.connect.storage.partitioner.FieldPartitioner","schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator","name":"s3-test","storage.class":"io.confluent.connect.s3.storage.S3Storage","s3.bucket.name<http://s3.bucket.name>":"test-bucket","rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>":"300000","partition.field.name<http://partition.field.name>":"externalTenantId", "timestamp.extractor":"Wallclock"} I get the following error. I am using version 3.3.0 2019-02-03 17:59:01,261] ERROR Commit of WorkerSinkTask{id=s3-test-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:205) java.lang.NullPointerException at io.confluent.connect.s3.S3SinkTask.preCommit(S3SinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-02-03 17:59:01,262] ERROR Task s3-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148) java.lang.NullPointerException at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:486) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:152) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2019-02-03 17:59:01,262] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149) Thanks, -Manu -----Original Message----- From: Pere Urbón Bayes <pere.ur...@gmail.com<mailto:pere.ur...@gmail.com>> Sent: Sunday, February 03, 2019 12:50 PM To: users@kafka.apache.org<mailto:users@kafka.apache.org> Subject: Re: Kafka connect FieldPartitioner with scheduled rotation EXTERNAL Hi Manu, can you share your s3 connector config as well the exception you are getting? with only this info, I do need more details to understand your issue. Keep in mind the option you are using "rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>" from the docs says: > This configuration is useful when you have to commit your data based > on current server time, for example at the beginning of every hour. The default value -1 means that this feature is disabled. Cheers Missatge de Manu Jacob <manu.ja...@sas.com<mailto:manu.ja...@sas.com>> del dia dg., 3 de febr. 2019 a les 16:47: > Hi, > > I want to use s3 connect by portioning with FieldPartitioner and > partition.field.name<http://partition.field.name> set to a non timestamp > based field. I want to > commit and flush based on both size and time. I am getting an > exception when I use the option > "rotate.schedule.interval.ms<http://rotate.schedule.interval.ms>". Is it > possible to rotate it with FieldPartitioner? I tried to set the > timestamp.extractor (with record and wallclock) but it looks like it > is honored only for time based partitioner. > > -Manu > > > -- Pere Urbon-Bayes Software Architect http://www.purbon.com https://twitter.com/purbon https://www.linkedin.com/in/purbon/ -- Pere Urbon-Bayes Software Architect http://www.purbon.com<http://www.purbon.com/> https://twitter.com/purbon https://www.linkedin.com/in/purbon/