Jordan Moore created BEAM-13705: ----------------------------------- Summary: OOM / memory-leak in KafkaIO.read using Confluent KafkaAvroDeserializer with SpecificRecord Key: BEAM-13705 URL: https://issues.apache.org/jira/browse/BEAM-13705 Project: Beam Issue Type: Bug Components: io-java-kafka Affects Versions: 2.35.0 Reporter: Jordan Moore Attachments: Screen Shot 2022-01-19 at 5.30.51 PM.png
*Details* - Trying to use a generated Avro SpecificRecord subclass with KafkaIO.read (I was able to use KafkaIO.write fine with it). *Problem* - OOM happens while constructing the deserializer with SpecificRecord, but not GenericRecord. I am unable to use my generated class because I get errors saying it cannot be cast to a GenericRecord (even though it extends/implements it though a chain of other classes) {code} 2022-01-19 17:17:47,163 DEBUG [main] options.PipelineOptionsFactory$Builder (PipelineOptionsFactory.java:325) - Provided Arguments: {} 2022-01-19 17:17:47,345 DEBUG [main] sdk.Pipeline (Pipeline.java:158) - Creating Pipeline#817686795 2022-01-19 17:17:47,382 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - Adding KafkaIO.Read [KafkaIO.TypedWithoutMetadata] to Pipeline#817686795 2022-01-19 17:17:47,383 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - Adding KafkaIO.Read to Pipeline#817686795 2022-01-19 17:17:47,445 DEBUG [main] coders.CoderRegistry (CoderRegistry.java:635) - Coder for [B: ByteArrayCoder java.lang.OutOfMemoryError: Java heap space Dumping heap to /tmp/beam-dump ... Heap dump file created [1086964638 bytes in 1.315 secs] Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$HashEntry.newArray(BoundedConcurrentHashMap.java:247) at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$Segment.<init>(BoundedConcurrentHashMap.java:1200) at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1637) at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1670) at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1654) at io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1683) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:181) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:170) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:136) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:98) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.lambda$of$282520f2$1(ConfluentSchemaRegistryDeserializerProvider.java:93) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider$$Lambda$70/1932332324.apply(Unknown Source) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaRegistryClient(ConfluentSchemaRegistryDeserializerProvider.java:134) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:126) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:120) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:116) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1476) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1256) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:605) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1555) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1529) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177) at cricket.jmoore.jmx.Main.main(Main.java:98) {code} Small example with Kafka and Confluent Schema Registry locally {code} public static void main(String[] args) throws Exception { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); // Pipeline p = getWritePipeline(options); Pipeline p = Pipeline.create(options); final String topic = "foobar-2"; final SubjectNameStrategy subjectStrategy = new TopicNameStrategy(); final String valueSubject = subjectStrategy.subjectName(topic, false, null); // schema not used final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord> valueProvider = ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", valueSubject, null, // TODO: This doesn't seem to work to get the SpecificRecord subclass in the apply function below ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true)); p .apply(KafkaIO.<byte[], SpecificRecord>read() .withBootstrapServers("localhost:9092") .withTopic(topic) .withKeyDeserializer(ByteArrayDeserializer.class) // Don't have any keys, but this is required .withValueDeserializer(valueProvider) .withConsumerConfigUpdates(ImmutableMap.of( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT), ConsumerConfig.GROUP_ID_CONFIG, "beam-" + UUID.randomUUID() )) .withoutMetadata() ).apply(Values.create()) // TODO: How to get SpecificRecord subclass? .apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() { @Override public Void apply(SpecificRecord input) { log.info("{}", input); return null; } })); p.run().waitUntilFinish(); } {code} Avro schema that I am using, which generates a class Product.java that I would like to use in-place of SpecificRecord above. {code} {"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]} {code} *Beam Version*: 2.35.0 *Confluent Version*: 7.0.1 (error seems to come from here... will try to downgrade this) Dependencies: {code} <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>7.0.1</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> <!-- 2.35.0 --> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)