StreamingFileSink bulk formats - small files

2022-03-03 Thread Kamil ty
Hello all, In multiple jobs I'm saving data using the datastream API with StreamingFileSink and various bulk formats (avro, parquet). As bulk formats require a rolling policy that extends the CheckpointRollingPolicy I have created a policy that rolls on file size additionally. Unfortunately for so

Re: Example with JSONKeyValueDeserializationSchema?

2022-03-03 Thread Kamil ty
onSchema > .setDeserializer(new > JSONKeyValueDeserializationSchema(false)) > > > What am I doing wrong? > As per the documentation JSONKeyValueDeserializationSchema returns an > ObjectNode. > > Regards Hans-Peter > > > > Op vr 14 jan. 2022 om 20:3

Re: Example with JSONKeyValueDeserializationSchema?

2022-01-14 Thread Kamil ty
Hello Hans, As far as I know the JSONKeyValueDeserializationSchema returns a Jackson ObjectNode. Below I have included an example based on Flink stable documentation. KafkaSource source = KafkaSource.builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setSta

Flink per-job cluster HbaseSinkFunction fails before starting - Configuration issue

2022-01-14 Thread Kamil ty
Hello all, I have a flink job that is using the HbaseSinkFunction as specified here: flink/flink-connectors/flink-connector-hbase-2.2 at master · a0x8o/flink (github.com)

Re: Running a flink job with Yarn per-job mode from application code.

2021-12-09 Thread Kamil ty
ld also recommend using REST API to operate on an existing job. See > [1] for the APIs. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/ > > Kamil ty 于2021年12月8日周三 21:15写道: > >> Thank you. I guess I will try this solution. One thing I'

Running a flink job with Yarn per-job mode from application code.

2021-12-07 Thread Kamil ty
Hello all, I'm looking for a way to submit a Yarn job from another flink jobs application code. I can see that you can access a cluster and submit jobs with a RestClusterClient, but it seems a Yarn per-job mode is not supported with it. Any suggestions would be appreciated. Best Regards Kamil

Pyflink/Flink Java parquet streaming file sink for a dynamic schema stream

2021-12-02 Thread Kamil ty
Hello, I'm wondering if there is a possibility to create a parquet streaming file sink in Pyflink (in Table API) or in Java Flink (in Datastream api). To give an example of the expected behaviour. Each element of the stream is going to contain a json string. I want to save this stream to parquet

Pyflink 1.13.2 convert datastream into table BIG_INT type

2021-11-28 Thread Kamil ty
Hello I'm trying to convert a datastream into a table using: table_env.from_datastream(ds) The ds contains some fields with the Types.BIG_INT() type. Those fields seem to be converted to: RAW('java.math.BigInteger', '...'). This is seen as an error by flink which is resulting in table query and si

Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-23 Thread Kamil ty
s Pohl > wrote: > >> Hi Kamil, >> afaik, the parameter passing should work as normal by just appending them >> to the Flink job submission similar to the Java job submission: >> ``` >> $ ./flink run --help >> Action "run" compiles and runs a program.

Re: Table API Filesystem connector - disable interval rolling policy

2021-11-22 Thread Kamil ty
cts. I'm gonna add Francesco to this thread. >> Maybe he has better ideas on how to answer your question. >> >> Best, >> Matthias >> >> On Mon, Nov 22, 2021 at 10:32 AM Kamil ty wrote: >> >>> Hey all, >>> >>> I wanted to know if

Flink CLI - pass command line arguments to a pyflink job

2021-11-22 Thread Kamil ty
Hey, Looking at the examples at Command-Line Interface | Apache Flink I don't see an example of passing command line arguments to a pyflink job when deploying the job to a remote cluster with flink cli. Is this suppo

Table API Filesystem connector - disable interval rolling policy

2021-11-22 Thread Kamil ty
Hey all, I wanted to know if there is a way to disable the interval rolling policy in the Table API filesystem connector. >From flink docs: FileSystem | Apache Flink The key to change the

Re: Deserialize generic kafka json message in pyflink. Single kafka topic, multiple message schemas (debezium).

2021-11-22 Thread Kamil ty
expects a DeserializationSchema instead of > JsonRowDeserialization and so I guess you could try SimpleStringSchema. > > Regards, > Dian > > On Sat, Nov 20, 2021 at 5:55 AM Kamil ty wrote: > >> Hello all, >> >> I'm working on a pyflink job that's supposed

Deserialize generic kafka json message in pyflink. Single kafka topic, multiple message schemas (debezium).

2021-11-19 Thread Kamil ty
Hello all, I'm working on a pyflink job that's supposed to consume json messages from Kafka and save them to a partitioned avro file sink. I'm having difficulties finding a solution on how to process the messages, because there is only one kafka topic for multiple message schemas. As pyflinks Flin

Re: Pyflink PyPi build - scala 2.12 compatibility

2021-11-10 Thread Kamil ty
flink run` command refers to. If you > want to work with scala 2.12, it should refer to a custom Flink > distribution of 2.12. > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/master/flink-python/pyflink/find_flink_home.py#L46 > > On Wed, Nov 10, 2021 a

Pyflink PyPi build - scala 2.12 compatibility

2021-11-09 Thread Kamil ty
Hello, Just wanted to verify if the default build of pyflink available from PyPi is compatible with flink - scala version 2.12. I have noticed that the PyPi pyflink version comes with apache-flink-libraries targeted for scala 2.11 only and I was wondering if this might be the cause of some issues

Pyflink data stream API to Table API conversion with multiple sinks.

2021-10-08 Thread Kamil ty
Hello, In my pyflink job I have such flow: 1. Use table API to get messages from Kafka 2. Convert the table to a data stream 3. Convert the data stream back to the table API 4. Use a statement set to write the data to two filesystem sinks (avro and parquet) I'm able to run the job and everything

Re: Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-08 Thread Kamil ty
gt; Dian > > > > On Tue, Oct 5, 2021 at 11:34 PM Nicolaus Weidner < > nicolaus.weid...@ververica.com> wrote: > >> Hi Kamil, >> >> On Tue, Oct 5, 2021 at 9:03 AM Kamil ty wrote: >> >>> Hello, >>> >>> I'm trying to run a

Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-05 Thread Kamil ty
Hello, I'm trying to run a pyflink job in cluster mode (with yarn). My job contains source and sink definitions using Table API which are converted to a datastream and back. Unfortunately I'm getting an unusual exception at: *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).* T

Flink CEP in PyFlink

2021-09-07 Thread Kamil ty
Hello all, I would like to use Flink CEP for my development requirements. Is Flink CEP supported in PyFlink? If not, are there any available workarounds? Kind regards Kamil

PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Kamil ty
Hello, I'm trying to save my data stream to an Avro file on HDFS. In Flink documentation I can only see explanations for Java/Scala. However, I can't seem to find a way to do it in PyFlink. Is this possible to do in PyFlink currently? Kind Regards Kamil