Thank you for having a look at this. I agree that the only way to really gauge load is to look at lag. But the connector tasks should not crash and die because of load. I will raise this with SF.
On Wed, Jul 3, 2024 at 7:14 PM Greg Harris <greg.har...@aiven.io.invalid> wrote: > Hey Burton, > > Thanks for your question and bug report. > > The exception you included does not indicate that your connectors are > overloaded. The primary way of diagnosing an overloaded connector is the > consumer lag metric, and if you're seeing acceptable lag, that should > indicate that your connectors are capable of handling the load. > I would say that your workload is _unhealthy_ though, because it should be > able to operate without throwing this particular exception. I found one > previous report of this exception [1] but no action was taken. Instead, the > recommendation was to change the task implementation to perform offset > loading only during open(). > > It looks like this depends on the task implementation: If the consumer > rebalances and the task loses its assignment, and the task later calls > SinkTaskContext#offset() with the now-revoked partition, it would cause > this exception. > I'm not familiar with the Snowflake task, but upon a cursory inspection, it > looks like it buffers records [2] across poll() calls, and may call > SinkTaskContext#offset() in a later poll [3]. They appear to have a close() > method that could be used to prevent SinkTaskContext#offset() from being > called, but I'm not sure why it isn't effective. > You should contact the Snowflake Connector maintainers and know that they > are exposed to this exception. > > I'll re-open this issue on the framework side to see if we can find a > solution to fix this for other connectors. > > Thanks, > Greg > > [1] https://issues.apache.org/jira/browse/KAFKA-10370 > [2] > > https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383 > [3] > > https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908 > [4] > > https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442 > > On Wed, Jul 3, 2024 at 12:25 PM Burton Williams < > burton.b.willi...@gmail.com> > wrote: > > > Hi, > > > > I have 100+ sink connectors running with 100+ topics each with roughly 3 > > partitions per topic. There are running on K8s on 10 pods with 6 cpus and > > 32Gig mem. The connector in question is Snowflake's sink connector > v2.2.0. > > This worked in the mini batch mode SNOWPIPE, but once i switched over to > > SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the > > exception: > > > > State: FAILED > > > > Worker ID: 10.136.83.73:8080 > > > > Trace: java.lang.IllegalStateException: No current assignment for > > partition bigpicture.bulk_change-0 > > at > > > > > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369) > > > > at > > > > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386) > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238) > > > > at > > > > > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207) > > > > at > > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229) > > > > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284) > > > > at > > > > > org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) > > > > at > > > > > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > > > > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > > > > at > > > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > > at > > > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > > > at java.base/java.lang.Thread.run(Thread.java:829) > > > > > > > > My questions are: > > > > 1. Are these connectors are overloaded? Can kafka connect handle this > > level of load? > > 2. say it can, which i've seen it do, could it be that this is caused > by > > underlying rebalancing? If so what would you recommend I do to > mitigate? > > > > > > Thanks > > > > -BW > > >