Hello,

I am working on a ParquetSink writer, which will convert a kafka stream to
parquet format. I am having some weird issues in deploying this application
to a yarn cluster. I am not 100% sure this falls into a flink related
error, but I wanted to reach out to folks here incase it might be.


If I launch Flink within YARN only for executing a single job, it runs ok.
This is the command I use for the deployment:

*Command:* *flink run  --jobmanager yarn-cluster -ytm 4096 -yjm 1048 -ys 2
-yn 2 -d -c <class_name> jar_name.jar*

However as soon as I try to submit a similar job to a already running yarn
cluster, I start to get these
errors(*https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57
<https://gist.github.com/neoeahit/f0130e9f447ea9c2baa38bf5ee4e6a57>*) and
application crashes. I checked the location in /tmp, where I am creating
the file, and there is no file existing there.

*Command:* *flink run -yid application_id -d -c <class_name> jar_name.jar *


A bit more about my algorithm, I use a temp array to buffer messages in the
@invoke method, and when specific threshold are reached I create a parquet
file with this buffered data. Once a tmp parquet file is created, I upload
this file to long term storage.

The code to write buffered data to a parquet file is:

 writer = Some(AvroParquetWriter.builder(getPendingFilePath(tmp_filename.get))
   .withSchema(schema.get)
   .withCompressionCodec(compressionCodecName)
   .withRowGroupSize(blockSize)
   .withPageSize(pageSize)
   .build())
bufferedMessages.foreach { e =>
  writer.get.write(e.payload)
}
writer.get.close()


Please do let me know.

Thanking in advance,
- Vipul

Reply via email to