Thanks for the quick response Piotr,

I feel like I have everything working but no files are getting written to
disk. I've implemented my own BucketAssigner like so:

class BackFillBucketAssigner[IN] extends BucketAssigner[IN, String] {
  override def getBucketId(element: IN, context:
BucketAssigner.Context): String = {
    
DateTimeFormatter.ofPattern(formatString).withZone(ZoneId.systemDefault).format(Instant.ofEpochMilli(context.timestamp()))
  }

  override def getSerializer: SimpleVersionedSerializer[String] =
SimpleVersionedStringSerializer.INSTANCE

}

And plugged it into my sink:

  val parquet = StreamingFileSink
    .forBulkFormat(path,
ParquetAvroWriters.forGenericRecord(ReflectData.get().getSchema(clazz)))
    .withBucketAssigner(new BackFillBucketAssigner[GenericRecord])
    .build

stream.addSink(parquet)

When I run locally I can see the temporary part files but nothing ever
gets rolled. I saw this once before when I didn't have checkpointing
enabled for my original streaming job and this note tipped me off:

IMPORTANT: Bulk-encoding formats can only be combined with the
`OnCheckpointRollingPolicy`, which rolls the in-progress part file on
every checkpoint.

Is it possible that something similar is happening? I have enabled
checkpointing in the job however since it is reading from flat files
and assigning a timestamp, is it possible checkpointing not working as
I expect? Nothing in my logs seems to suggest an error and the job
runs to completion (about 30 minutes).

Thanks again for your help!
Peter


On Fri, May 3, 2019 at 4:46 AM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Peter,
>
> It sounds like this should work, however my question would be do you want
> exactly-once processing? If yes, then you would have to somehow know which
> exact events needs re-processing or deduplicate them somehow. Keep in mind
> that in case of an outage in the original job, you probably will have some
> files already committed by the StreamingFileSink.
>
> Another approach might be to somehow overwrite the previous files (but
> then you would have to check whether the bucket assignment and file naming
> is completely deterministic) or before reprocessing from backup remove the
> dirty files from the crashed job.
>
> Piotrek
>
> On 2 May 2019, at 23:10, Peter Groesbeck <peter.groesb...@gmail.com>
> wrote:
>
> Hi all,
>
> I have an application that reads from various Kafka topics and writes
> parquet files to corresponding buckets on S3 using StreamingFileSink with
> DateTimeBucketAssigner. The upstream application that writes to Kafka
> also writes records as gzipped json files to date bucketed locations on S3
> as backup.
>
> One requirement we have is to back fill missing data in the event that the
> application or Kafka experiences an outage. This can be accomplished by
> reading the backup files that were written to S3 by our upstream
> application instead of reading from Kafka. My current approach is to read
> the hourly backup buckets, transform the files into a DataStream and
> assign them a timestamp based on a datetime field on the json records using
> BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the
> DataStream to the same StreamingFileSink which ideally would write past
> records in the same manner as if they had been streamed by Kafka.
>
> Unfortunately for me, the bucket assigner works on system time:
>
> A BucketAssigner
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html>
>  that
> assigns to buckets based on current system time.
>
> @Override
> public String getBucketId(IN element, BucketAssigner.Context context) {
>    if (dateTimeFormatter == null) {
>       dateTimeFormatter = 
> DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
>    }
>    return 
> dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
> }
>
>
> No problem, I can extend DateTimeBucketAssigner and override the method to 
> grab elementTimestamp instead of currentProccessingTime, but I'm wondering if 
> this is the right approach? And if so would this behavior be useful outside 
> of the context of my application?
>
> Thanks in advance for your help and for this awesome framework!
>
> Peter
>
>
>

Reply via email to