Flink Stream job to parquet sink

2020-06-05 Thread aj
Hello All, I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for a

Data Quality Library in Flink

2020-06-05 Thread aj
Hello All, I want to do some data quality analysis on stream data example. 1. Fill rate in a particular column 2. How many events are going to error queue due to favor schema validation failed? 3. Different statistics measure of a column. 3. Alert if a particular threshold is breached (like if f

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Congxian Qiu
Hi Chris >From the given exception, seems there is something wrong of the FileSystem, one reason is that Arvid gave (incremental checkpoint may generate too much small files) You can turn off incremental checkpoint or try to increase the config `state.backend.fs.memory-threshold` to see if things

Re: Flink s3 streaming performance

2020-06-05 Thread venkata sateesh` kolluru
Hi Kostas and Arvid, Thanks for your suggestions. The small files were already created and I am trying to roll few into a big file while sinking. But due to the custom bucket assigner, it is hard getting more files with in the same prefix in specified checkinpointing time. For example: /prefix1/

Re: Tumbling windows - increasing checkpoint size over time

2020-06-05 Thread Wissman, Matt
Guowei, I had a different Flink app that was using 10 or15s intervals – it had a similar behavior but not nearly as bad as the 2s interval pipeline. Both have much have much longer checkpoint intervals now. Here is the state config: state.backend: rocksdb state.checkpoints.dir: {{ .Values.

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Instead of changing the query, I used to embed the query in a larger context for similar works. So if you get an arbitrary query X which produces exactly one result (e.g. X = select sum(revenue) from lineorder group by 1) then you can craft a query where you add a dummy pk to the result. Table or

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hey Arvid, Thanks for the reply. As you suggested, rewriting the query to add a dummy output and group by the clause - "select 1, sum(revenue) from lineorder group by 1" does add a unique key column to the output, and the pipeline succeeds. However, the application may get arbitrary SQL from the

Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Arvid Heise
Hi Satyam, you are right, there seems to be a disconnect between javadoc and implementation. Jark probably knows more. In your case, couldn't you just add a dummy column containing a constant key? select 'revenue' AS name, sum(revenue) from lineorder and then set the dummy field as PK? On Fri,

Re: Stopping a job

2020-06-05 Thread Arvid Heise
Hi, could you check if this SO thread [1] helps you already? [1] https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable On Thu, Jun 4, 2020 at 7:43 PM M Singh wrote: > Hi: > > I am running a job which consumes data from Kinesis and send data to > anothe

Re: Getting Window information from coGroup functin

2020-06-05 Thread Arvid Heise
Hi Sudan, it seems to be unsupported directly. You can have a hacky workaround by replicating apply[1] in your code and adjust the last line to call your CoGroupWindowFunction. [1] https://github.com/apache/flink/blob/aedb4068408cfcad6f258526b00fcbff7f40fb82/flink-streaming-java/src/main/java/or

Re: User / Job Manager (permissions) for Flink

2020-06-05 Thread Arvid Heise
If you are running in K8s, you could also directly use the ingest layer of that. That's especially convenient if you have managed to connect that your company's SSO. On Tue, Jun 2, 2020 at 9:38 PM Robert Metzger wrote: > Hi David, > > I guess you could also "just" put a nginx in front of Flink's

Re: Creating Kafka Topic dynamically in Flink

2020-06-05 Thread Arvid Heise
Hi Prasanna, auto.create.topics.enable is only recommended for development clusters and not in production use cases (as one programming error could potentially flood the whole broker with a large amount of topics). I have experienced first hand the mess it makes. I'd suggest finding a supplemental

Re: Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-06-05 Thread Arvid Heise
Hi Arnaud, just to add up. The overhead of this additional map is negligible if you enable object reuse [1]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger wrote: > I'm not 100% sure about this answer,

Re: Dynamically merge multiple upstream souces

2020-06-05 Thread Arvid Heise
Hi Yi, one option is to use Avro, where you define one global Avro schema as the source of truth. Then you add aliases [1] to this schema for each source where the fields are named differently. You use the same schema to read the Avro messages from Kafka and Avro automatically converts the data wi

Re: Flink s3 streaming performance

2020-06-05 Thread Kostas Kloudas
Hi all, @Venkata, Do you have many small files being created as Arvid suggested? If yes, then I tend to agree that S3 is probably not the best sink. Although I did not get that from your description. In addition, instead of PrintStream you can have a look at the code of the SimpleStringEncoder in

Re: [EXTERNAL] Re: Inconsistent checkpoint durations vs state size

2020-06-05 Thread Arvid Heise
Hi Chris, could you also try what happens when you turn incremental checkpoints off? Incremental checkpoints may create many small files which are a bad fit for HDFS. You could also evaluate other storage options (net drive, S3) if you find incremental checkpoints to be better. On Tue, Jun 2, 20

Re: Flink s3 streaming performance

2020-06-05 Thread Arvid Heise
Hi Venkata, are the many small files intended or is it rather an issue of our commit on checkpointing? If so then FLINK-11499 [1] should help you. Design is close to done, unfortunately implementation will not make it into 1.11. In any case, I'd look at the parameter fs.s3a.connection.maximum, as

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

2020-06-05 Thread Arvid Heise
A common approach is to use a dead letter queue, which is an extra output for bad input. So the result of the read operation would look like Tuple2 (or use Either in scala) and return the parsed TBase on success or else put in the invalid record byte[]. Then in your DAG, split the handling of the

Re: Native K8S not creating TMs

2020-06-05 Thread kb
Thanks Yang for the suggestion, I have tried it and I'm still getting the same exception. Is it possible its due to the null pod name? Operation: [create] for kind: [Pod] with name: [null] in namespace: [default] failed. Best, kevin -- Sent from: http://apache-flink-user-mailing-list-archiv

Re: Auto adjusting watermarks?

2020-06-05 Thread Arvid Heise
Hi Theo, The general idea is interesting. I'd probably start with some initial out of boundness, and after collecting X elements, switch to the histogram. It sounds very valid to snapshot it. I'd probably use a union state to also support rescaling in a meaningful way. However, tbh for a producti

Re: Run command after Batch is finished

2020-06-05 Thread Jeff Zhang
You can try JobListener which you can register to ExecutionEnvironment. https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java Mark Davis 于2020年6月6日周六 上午12:00写道: > Hi there, > > I am running a Batch job with several outputs. > Is t

Run command after Batch is finished

2020-06-05 Thread Mark Davis
Hi there, I am running a Batch job with several outputs. Is there a way to run some code(e.g. release a distributed lock) after all outputs are finished? Currently I do this in a try-finally block around ExecutionEnvironment.execute() call, but I have to switch to the detached execution mode -

Re: Creating TableSchema from the Avro Schema

2020-06-05 Thread Dawid Wysakowicz
First of all to give a back story for the deprecation we do not want to depend on the TypeInformation anymore for the types in Table API as it binds both the on-wire representation with the logical types of the SQL API. The goal is to use the DataType exclusively in the Table API (including for the

Re: [External Sender] Re: Flink sql nested elements

2020-06-05 Thread Ramana Uppala
Hi Leonard, We are using Flink 1.10 version and I can not share the complete schema but it looks like below in Hive Catalog, flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressL

Re: [External Sender] Re: Avro Arrat type validation error

2020-06-05 Thread Ramana Uppala
Hi Dawid, We are using a custom connector that is very similar to Flink Kafka Connector and instantiating TableSchema using a custom class which maps Avro types to Flink's DataTypes using TableSchema.Builder. For Array type, we have below mapping: case ARRAY: return DataTypes.A

UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hello, I am using Flink as the query engine to build an alerting/monitoring application. One of the use cases in our product requires continuously tracking and charting the output of an aggregate only SQL query, for example, select sum(revenue) from lineorder. A desirable property from the output

Re: FlinkKinesisProducer sends data to only 1 shard. Is it because I don't have "AggregationEnabled" set to false ?

2020-06-05 Thread Vijay Balakrishnan
Hi, Resolved the issue by using a Custom Partitioner and setting RequestTimeout properties. kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner()); private static final class SerializableCustomPartitioner extends KinesisPartitioner> { private static final long serialVersio

Re: Avro Arrat type validation error

2020-06-05 Thread Dawid Wysakowicz
Hi Ramana, What connector do you use or how do you instantiate the TableSource? Also which catalog do you use and how do you register your table in that catalog? The problem is that conversion from TypeInformation to DataType produces legacy types (because they cannot be mapped exactyl 1-1 to the