Thank you for having a look at this. I agree that the only way to really
gauge load is to look at lag. But the connector tasks should not crash and
die because of load. I will raise this with SF.

On Wed, Jul 3, 2024 at 7:14 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hey Burton,
>
> Thanks for your question and bug report.
>
> The exception you included does not indicate that your connectors are
> overloaded. The primary way of diagnosing an overloaded connector is the
> consumer lag metric, and if you're seeing acceptable lag, that should
> indicate that your connectors are capable of handling the load.
> I would say that your workload is _unhealthy_ though, because it should be
> able to operate without throwing this particular exception. I found one
> previous report of this exception [1] but no action was taken. Instead, the
> recommendation was to change the task implementation to perform offset
> loading only during open().
>
> It looks like this depends on the task implementation: If the consumer
> rebalances and the task loses its assignment, and the task later calls
> SinkTaskContext#offset() with the now-revoked partition, it would cause
> this exception.
> I'm not familiar with the Snowflake task, but upon a cursory inspection, it
> looks like it buffers records [2] across poll() calls, and may call
> SinkTaskContext#offset() in a later poll [3]. They appear to have a close()
> method that could be used to prevent SinkTaskContext#offset() from being
> called, but I'm not sure why it isn't effective.
> You should contact the Snowflake Connector maintainers and know that they
> are exposed to this exception.
>
> I'll re-open this issue on the framework side to see if we can find a
> solution to fix this for other connectors.
>
> Thanks,
> Greg
>
> [1] https://issues.apache.org/jira/browse/KAFKA-10370
> [2]
>
> https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L380-L383
> [3]
>
> https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java#L908
> [4]
>
> https://github.com/snowflakedb/snowflake-kafka-connector/blob/b08ee569de6b943f8f624cd4421f7a6c7a10d532/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java#L442
>
> On Wed, Jul 3, 2024 at 12:25 PM Burton Williams <
> burton.b.willi...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I have 100+ sink connectors running with 100+ topics each with roughly 3
> > partitions per topic. There are running on K8s on 10 pods with 6 cpus and
> > 32Gig mem. The connector in question is Snowflake's sink connector
> v2.2.0.
> > This worked in the mini batch mode SNOWPIPE, but once i switched over to
> > SNOWPIPE_STREAMING, it no longer works. Tasks are failing with the
> > exception:
> >
> > State:      FAILED
> >
> > Worker ID:  10.136.83.73:8080
> >
> > Trace:      java.lang.IllegalStateException: No current assignment for
> > partition bigpicture.bulk_change-0
> >     at
> >
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:369)
> >
> >     at
> >
> >
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:386)
> >     at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1637)
> >
> >     at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:642)
> >
> >     at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
> >
> >     at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:238)
> >
> >     at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207)
> >
> >     at
> > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
> >
> >     at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
> >
> >     at
> >
> >
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
> >
> >     at
> >
> >
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> >
> >     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> >
> >     at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> >
> >     at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> >
> >     at java.base/java.lang.Thread.run(Thread.java:829)
> >
> >
> >
> > My questions are:
> >
> >    1. Are these connectors are overloaded? Can kafka connect handle this
> >    level of load?
> >    2. say it can, which i've seen it do, could it be that this is caused
> by
> >    underlying rebalancing? If so what would you recommend I do to
> mitigate?
> >
> >
> > Thanks
> >
> > -BW
> >
>

Reply via email to