[ https://issues.apache.org/jira/browse/BEAM-13705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jordan Moore updated BEAM-13705: -------------------------------- Resolution: Not A Bug Status: Resolved (was: Triage Needed) It's an error in Confluent library. This alone throws OOM {code} final CachedSchemaRegistryClient client = new CachedSchemaRegistryClient("http://localhost:8081", 2147483647, ImmutableMap.of( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true )); {code} > OOM / memory-leak in KafkaIO.read using Confluent KafkaAvroDeserializer with > SpecificRecord > -------------------------------------------------------------------------------------------- > > Key: BEAM-13705 > URL: https://issues.apache.org/jira/browse/BEAM-13705 > Project: Beam > Issue Type: Bug > Components: io-java-kafka > Affects Versions: 2.35.0 > Reporter: Jordan Moore > Priority: P3 > Attachments: Screen Shot 2022-01-19 at 5.30.51 PM.png > > > *Details* - Trying to use a generated Avro SpecificRecord subclass with > KafkaIO.read (I was able to use KafkaIO.write fine with it). > *Problem* - OOM happens while constructing the deserializer with > SpecificRecord, but not GenericRecord. I am unable to use my generated class > because I get errors saying it cannot be cast to a GenericRecord (even though > it extends/implements it though a chain of other classes) > {code} > 2022-01-19 17:17:47,163 DEBUG [main] options.PipelineOptionsFactory$Builder > (PipelineOptionsFactory.java:325) - Provided Arguments: {} > 2022-01-19 17:17:47,345 DEBUG [main] sdk.Pipeline (Pipeline.java:158) - > Creating Pipeline#817686795 > 2022-01-19 17:17:47,382 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - > Adding KafkaIO.Read [KafkaIO.TypedWithoutMetadata] to Pipeline#817686795 > 2022-01-19 17:17:47,383 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - > Adding KafkaIO.Read to Pipeline#817686795 > 2022-01-19 17:17:47,445 DEBUG [main] coders.CoderRegistry > (CoderRegistry.java:635) - Coder for [B: ByteArrayCoder > java.lang.OutOfMemoryError: Java heap space > Dumping heap to /tmp/beam-dump ... > Heap dump file created [1086964638 bytes in 1.315 secs] > Exception in thread "main" java.lang.OutOfMemoryError: Java heap space > at > io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$HashEntry.newArray(BoundedConcurrentHashMap.java:247) > at > io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$Segment.<init>(BoundedConcurrentHashMap.java:1200) > at > io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1637) > at > io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1670) > at > io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1654) > at > io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1683) > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:181) > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:170) > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:136) > at > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:98) > at > org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.lambda$of$282520f2$1(ConfluentSchemaRegistryDeserializerProvider.java:93) > at > org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider$$Lambda$70/1932332324.apply(Unknown > Source) > at > org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaRegistryClient(ConfluentSchemaRegistryDeserializerProvider.java:134) > at > org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:126) > at > org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:120) > at > org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:116) > at > org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1476) > at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1256) > at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:605) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482) > at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) > at > org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1555) > at > org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1529) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482) > at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) > at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177) > at cricket.jmoore.jmx.Main.main(Main.java:98) > {code} > Small example with Kafka and Confluent Schema Registry locally > {code} > public static void main(String[] args) throws Exception { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > // Pipeline p = getWritePipeline(options); > Pipeline p = Pipeline.create(options); > final String topic = "foobar-2"; > final SubjectNameStrategy subjectStrategy = new TopicNameStrategy(); > final String valueSubject = subjectStrategy.subjectName(topic, false, > null); // schema not used > final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord> > valueProvider = > > ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", > valueSubject, null, > // TODO: This doesn't > seem to work to get the SpecificRecord subclass in the apply function below > > ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true)); > p > .apply(KafkaIO.<byte[], SpecificRecord>read() > .withBootstrapServers("localhost:9092") > .withTopic(topic) > .withKeyDeserializer(ByteArrayDeserializer.class) // Don't > have any keys, but this is required > .withValueDeserializer(valueProvider) > .withConsumerConfigUpdates(ImmutableMap.of( > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT), > ConsumerConfig.GROUP_ID_CONFIG, "beam-" + > UUID.randomUUID() > )) > .withoutMetadata() > ).apply(Values.create()) > // TODO: How to get SpecificRecord subclass? > .apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() { > @Override > public Void apply(SpecificRecord input) { > log.info("{}", input); > return null; > } > })); > p.run().waitUntilFinish(); > } > {code} > Avro schema that I am using, which generates a class Product.java that I > would like to use in-place of SpecificRecord above. > {code} > {"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]} > {code} > *Beam Version*: 2.35.0 > *Confluent Version*: 7.0.1 (error seems to come from here... will try to > downgrade this) > Dependencies: > {code} > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>2.8.1</version> > </dependency> > <dependency> > <groupId>io.confluent</groupId> > <artifactId>kafka-avro-serializer</artifactId> > <version>7.0.1</version> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-runners-direct-java</artifactId> > <version>${beam.version}</version> <!-- 2.35.0 --> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.beam</groupId> > <artifactId>beam-sdks-java-core</artifactId> > <version>${beam.version}</version> > </dependency> > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)