Hello All,
I am receiving a set of events in Avro format on different topics. I want
to consume these and write to s3 in parquet format.
I have written a below job that creates a different stream for each event
and fetches it schema from the confluent schema registry to create a
parquet sink for a
Hello All,
I want to do some data quality analysis on stream data example.
1. Fill rate in a particular column
2. How many events are going to error queue due to favor schema
validation failed?
3. Different statistics measure of a column.
3. Alert if a particular threshold is breached (like if f
Hi Chris
>From the given exception, seems there is something wrong of the FileSystem,
one reason is that Arvid gave (incremental checkpoint may generate too much
small files)
You can turn off incremental checkpoint or try to increase the config
`state.backend.fs.memory-threshold` to see if things
Hi Kostas and Arvid,
Thanks for your suggestions.
The small files were already created and I am trying to roll few into a big
file while sinking. But due to the custom bucket assigner, it is hard
getting more files with in the same prefix in specified checkinpointing
time.
For example:
/prefix1/
Guowei,
I had a different Flink app that was using 10 or15s intervals – it had a
similar behavior but not nearly as bad as the 2s interval pipeline. Both have
much have much longer checkpoint intervals now.
Here is the state config:
state.backend: rocksdb
state.checkpoints.dir: {{ .Values.
Instead of changing the query, I used to embed the query in a larger
context for similar works.
So if you get an arbitrary query X which produces exactly one result (e.g.
X = select sum(revenue) from lineorder group by 1) then you can craft a
query where you add a dummy pk to the result.
Table or
Hey Arvid,
Thanks for the reply.
As you suggested, rewriting the query to add a dummy output and group by
the clause - "select 1, sum(revenue) from lineorder group by 1" does add a
unique key column to the output, and the pipeline succeeds.
However, the application may get arbitrary SQL from the
Hi Satyam,
you are right, there seems to be a disconnect between javadoc and
implementation. Jark probably knows more.
In your case, couldn't you just add a dummy column containing a constant
key?
select 'revenue' AS name, sum(revenue) from lineorder
and then set the dummy field as PK?
On Fri,
Hi,
could you check if this SO thread [1] helps you already?
[1]
https://stackoverflow.com/questions/53735318/flink-how-to-solve-error-this-job-is-not-stoppable
On Thu, Jun 4, 2020 at 7:43 PM M Singh wrote:
> Hi:
>
> I am running a job which consumes data from Kinesis and send data to
> anothe
Hi Sudan,
it seems to be unsupported directly.
You can have a hacky workaround by replicating apply[1] in your code and
adjust the last line to call your CoGroupWindowFunction.
[1]
https://github.com/apache/flink/blob/aedb4068408cfcad6f258526b00fcbff7f40fb82/flink-streaming-java/src/main/java/or
If you are running in K8s, you could also directly use the ingest layer of
that. That's especially convenient if you have managed to connect that your
company's SSO.
On Tue, Jun 2, 2020 at 9:38 PM Robert Metzger wrote:
> Hi David,
>
> I guess you could also "just" put a nginx in front of Flink's
Hi Prasanna,
auto.create.topics.enable is only recommended for development clusters and
not in production use cases (as one programming error could potentially
flood the whole broker with a large amount of topics). I have experienced
first hand the mess it makes.
I'd suggest finding a supplemental
Hi Arnaud,
just to add up. The overhead of this additional map is negligible if you
enable object reuse [1].
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html
On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger wrote:
> I'm not 100% sure about this answer,
Hi Yi,
one option is to use Avro, where you define one global Avro schema as the
source of truth. Then you add aliases [1] to this schema for each source
where the fields are named differently. You use the same schema to read the
Avro messages from Kafka and Avro automatically converts the data wi
Hi all,
@Venkata, Do you have many small files being created as Arvid suggested? If
yes, then I tend to agree that S3 is probably not the best sink. Although I
did not get that from your description.
In addition, instead of PrintStream you can have a look at the code of the
SimpleStringEncoder in
Hi Chris,
could you also try what happens when you turn incremental checkpoints off?
Incremental checkpoints may create many small files which are a bad fit for
HDFS. You could also evaluate other storage options (net drive, S3) if you
find incremental checkpoints to be better.
On Tue, Jun 2, 20
Hi Venkata,
are the many small files intended or is it rather an issue of our commit on
checkpointing? If so then FLINK-11499 [1] should help you. Design is close
to done, unfortunately implementation will not make it into 1.11.
In any case, I'd look at the parameter fs.s3a.connection.maximum, as
A common approach is to use a dead letter queue, which is an extra output
for bad input.
So the result of the read operation would look like Tuple2
(or use Either in scala) and return the parsed TBase on success or else put
in the invalid record byte[].
Then in your DAG, split the handling of the
Thanks Yang for the suggestion, I have tried it and I'm still getting the
same exception. Is it possible its due to the null pod name? Operation:
[create] for kind: [Pod] with name: [null] in namespace: [default]
failed.
Best,
kevin
--
Sent from: http://apache-flink-user-mailing-list-archiv
Hi Theo,
The general idea is interesting. I'd probably start with some initial out
of boundness, and after collecting X elements, switch to the histogram. It
sounds very valid to snapshot it. I'd probably use a union state to also
support rescaling in a meaningful way.
However, tbh for a producti
You can try JobListener which you can register to ExecutionEnvironment.
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
Mark Davis 于2020年6月6日周六 上午12:00写道:
> Hi there,
>
> I am running a Batch job with several outputs.
> Is t
Hi there,
I am running a Batch job with several outputs.
Is there a way to run some code(e.g. release a distributed lock) after all
outputs are finished?
Currently I do this in a try-finally block around
ExecutionEnvironment.execute() call, but I have to switch to the detached
execution mode -
First of all to give a back story for the deprecation we do not want to
depend on the TypeInformation anymore for the types in Table API as it
binds both the on-wire representation with the logical types of the SQL
API. The goal is to use the DataType exclusively in the Table API
(including for the
Hi Leonard,
We are using Flink 1.10 version and I can not share the complete schema but
it looks like below in Hive Catalog,
flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
`postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
VARCHAR(2147483647), `addressL
Hi Dawid,
We are using a custom connector that is very similar to Flink Kafka
Connector and instantiating TableSchema using a custom class which maps
Avro types to Flink's DataTypes using TableSchema.Builder.
For Array type, we have below mapping:
case ARRAY:
return
DataTypes.A
Hello,
I am using Flink as the query engine to build an alerting/monitoring
application. One of the use cases in our product requires continuously
tracking and charting the output of an aggregate only SQL query,
for example, select sum(revenue) from lineorder. A desirable property from
the output
Hi,
Resolved the issue by using a Custom Partitioner and setting RequestTimeout
properties.
kinesisProducer.setCustomPartitioner(new SerializableCustomPartitioner());
private static final class SerializableCustomPartitioner extends
KinesisPartitioner> {
private static final long serialVersio
Hi Ramana,
What connector do you use or how do you instantiate the TableSource?
Also which catalog do you use and how do you register your table in that
catalog?
The problem is that conversion from TypeInformation to DataType produces
legacy types (because they cannot be mapped exactyl 1-1 to the
28 matches
Mail list logo