Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Agnelo Dcosta
Hi Arvid, > writer schema encoded if you are using no schema registry? on the producer side we are using node with https://registry.npmjs.org/avsc/-/avsc-5.5.3.tgz and https://registry.npmjs.org/sinek/-/sinek-9.1.0.tgz libraries to publish messages. We specify the avro schema file to encode message

Re: JSON source for pyflink stream

2021-04-14 Thread Klemens Muthmann
Hi, We are loading our JSON from a Mongo Database. But we also found no readily available way to stream JSON Data into a Flink Pipeline. I guess this would be hard to implement since you have to know details about the JSON structure to do this. So I guess your best bet would be to implement you

Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-04-14 Thread Matthias Pohl
Thanks for everyone's feedback. I'm gonna initiate a vote in a separate thread. On Mon, Mar 29, 2021 at 9:18 AM Robert Metzger wrote: > +1 > > > > On Mon, Mar 29, 2021 at 5:44 AM Yangze Guo wrote: > > > +1 > > > > Best, > > Yangze Guo > > > > On Mon, Mar 29, 2021 at 11:31 AM Xintong Song > > w

Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Arvid Heise
For any schema change to be gracefully supported. You need to know both schemas (old + new) on reader side (=Flink). I'm curious how Flink should know the old schema as you only provide the new schema, right? Usually, you use the schema registry, such that each record has it's own schema attached

Re: Flink 1.11.4?

2021-04-14 Thread Roman Khachatryan
Hi Yuval, I'd expect 1.13 to be available in 2-3 weeks (there are no exact estimates). Regards, Roman On Tue, Apr 13, 2021 at 12:08 PM Yuval Itzchakov wrote: > > Roman, is there an ETA on 1.13? > > On Mon, Apr 12, 2021, 16:17 Roman Khachatryan wrote: >> >> Hi Maciek, >> >> There are no specifi

Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread bat man
Hi All, Is there any way I can inspect/query the checkpointed data. Scenario is like this - We have a high volume of data coming in the data stream pipeline for which kafka is source, in case if fails bcoz of bad data I want to analyse the data which caused the issue. It could be that some data s

Re: Flink docker 1.11.3 actually runs 1.11.2

2021-04-14 Thread Chesnay Schepler
Works properly for me. I think your suspicion about the .env is correct; it is probably not considered when checking whether something has changed, so docker juse re-uses the previous image. On 4/13/2021 9:51 PM, Flavio Pompermaier wrote: Hi Chesnay, my tests were done using docker-compose (wi

Call run() of another SourceFunction inside run()?

2021-04-14 Thread Schneider, Jochen
Hi! To work around FLINK-2491 which causes checkpointing issues for us I am trying to chain SourceFunctions so that the first one never quits. The basic idea is as follows: class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner])

Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
I am running Flink 1.12.2, and I was trying to up the parallelism of my Flink SQL job to see what happened. However, once I did that, my results became nondeterministic. This happens whether I set the table.exec.resource.default-parallelism config option or I set the default local parallelism t

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Piotr Nowojski
Hi, One thing that you can do is to read this record using Avro keeping `Result` as `bytes` and in a subsequent mapping function, you could change the record type and deserialize the result. In Data Stream API: source.map(new MapFunction { ...} ) Best, Piotrek śr., 14 kwi 2021 o 03:17 Sumeet Ma

flink1.12.2 "Failed to execute job"

2021-04-14 Thread ??????
After submit job, I received 'Failed to execute job' error. And the time between initialization and scheduling last 214s. What has happened during this period? version: flink: 1.12.2 deployment: k8s standalone logs: 2021-04-14 12:47:58,547 WARN org.apache.flink.streaming.connectors.kafka.F

PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Yik San Chan
The question is cross-posted on Stack Overflow https://stackoverflow.com/questions/67092978/pyflink-vectorized-udf-throws-nullpointerexception . I have a ML model that takes two numpy.ndarray - `users` and `items` - and returns an numpy.ndarray `predictions`. In normal Python code, I would do: ``

Re: Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread Piotr Nowojski
Hi, Depending how you configured your FlinkKafkaSource, but you can make the source to commit consumed offsets back to Kafka. So one way to examine them, would be to check those offsets in Kafka (I don't know how, but I'm pretty sure there is a way to do it). Secondly, if you want to examine Flin

Re: Call run() of another SourceFunction inside run()?

2021-04-14 Thread Piotr Nowojski
Hi, I think it should be working. At least from the top of my head I do not see any reason why it shouldn't be working. Just make sure that you are proxying all relevant methods, not only those defined in `SourceFunction`. For example `FlinkKafkaConsumer` is implementing/extending: `RichParallelS

Re: flink1.12.2 "Failed to execute job"

2021-04-14 Thread Piotr Nowojski
Hey, could you provide full logs from both task managers and job managers? Piotrek śr., 14 kwi 2021 o 15:43 太平洋 <495635...@qq.com> napisał(a): > After submit job, I received 'Failed to execute job' error. And the time > between initialization and scheduling last 214s. What has happened during >

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Piotr Nowojski
Hi, Yes, it looks like your query is non deterministic because of `FIRST_VALUE` used inside `GROUP BY`. If you have many different parallel sources, each time you run your query your first value might be different. If that's the case, you could try to confirm it with even smaller query: SE

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Pitorek, I was actually originally using a group function that WAS deterministic (but was a custom UDF I made), but chose something here built in. By non-deterministic, I mean that the number of records coming out is not consistent. Since the FIRST_VALUE here is on an attribute that is not part

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Timo Walther
Hi Dylan, streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); is currently not supported by the Table & SQL API. For now, val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() determines the mode. Thus, I would remove the line again. If you want to u

Re: JSON source for pyflink stream

2021-04-14 Thread Dian Fu
Hi Giacomo, All the connectors supported in the Table & SQL connectors could be used in PyFlink Table API and so you could use file system connector in PyFlink Table API. AFAIK, it supports new line delimited json in the filesystem connector in Flink 1.12. You could refer to [1] for more details.

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
I replaced the FIRST_VALUE with MAX to ensure that the results should be identical even in their content, and my problem still remains – I end up with a nondeterministic count of records being emitted into the sink when the parallelism is over 1, and that count is about 20-25% short (and not co

Re: flink1.12.2 "Failed to execute job"

2021-04-14 Thread Piotr Nowojski
Hi, I haven't found anything strange in the logs (I've received logs in a separate message). It looks like the problem is that split enumeration is taking a long time, and currently this is being done in the Job Manager's main thread, blocking other things from executing. For the time being I thin

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Piotr Nowojski
Hi Dylan, But if you are running your query in Streaming mode, aren't you counting retractions from the FULL JOIN? AFAIK in Streaming mode in FULL JOIN, when the first record comes in it will be immediately emitted with NULLs (not matched, as the other table is empty). Later if a matching record i

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Piotrek, I am looking at the count of records present in the sink table in Postgres after the entire job completes, not the number of inserts/retracts. I can see as the job runs that records are added and removed from the “sink” table. With parallelism set to 1, it always comes out to the same

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Dian Fu
Hi Yik San, 1) There are two kinds of Python UDFs in PyFlink: - General Python UDFs which process input elements at row basis. That is, it will process one row at a time. - Pandas UDFs which process input elements at batch basis. So you are correct that you need to use Pandas UDF for your require

Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Agnelo Dcosta
Hi Arvid, thanks for the reply. We are following the 1.12 documentation here: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro.html#data-type-mapping *Currently, the Avro schema is always derived from table schema. Explicitly defining an Avro schema is

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
Timo, Here is the plan (hopefully I properly cleansed it of company proprietary info without garbling it) Dylan == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.sink], fields=[id, attr, attr_mapped]) +- LogicalProject(id=[CASE(IS NOT NULL($0), $0, $2)], attr=[CASE

Flink Hadoop config on docker-compose

2021-04-14 Thread Flavio Pompermaier
Hi everybody, I'm trying to set up reading from HDFS using docker-compose and Flink 1.11.3. If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir' using FLINK_PROPERTIES (under environment section of the docker-compose service) I see in the logs the following line: "Could not find Hadoop configur

Re: In Flink SQL for kafka avro based table , is there support for FORWARD_TRANSITIVE schema change?

2021-04-14 Thread Arvid Heise
Hi Agnelo, if you reprocess all data and delete all old records with the old schema, then you have to add the schema to DDL and it will work. If you have records with old and new schema in your topic, you need to attach the schema information to the records. Avro records themselves do not have an

Aw: Re: JSON source for pyflink stream

2021-04-14 Thread G . G . M . 5611
Thanks to everyone for the tips. It helps a lot. I'll try the table API first and if that doesn't succeed I'll do as Klemens says. Cheers, Giacomo     Gesendet: Mittwoch, 14. April 2021 um 16:18 Uhr Von: "Dian Fu" An: "Klemens Muthmann" Cc: "Yik San Chan" , g.g.m.5...@web.de, "user" Betreff:

Re: Nondeterministic results with SQL job when parallelism is > 1

2021-04-14 Thread Dylan Forciea
On a side note - I changed to use the batch mode per your suggestion Timo, and my job ran much faster and with deterministic counts with parallelism turned on. So I'll probably utilize that for now. However, it would still be nice to dig down into why streaming isn't working in case I need that

Re: Get consumed Kafka offsets from Flink kafka source

2021-04-14 Thread bat man
Thanks Piotrek for the references. Cheers. Hemant On Wed, Apr 14, 2021 at 7:18 PM Piotr Nowojski wrote: > Hi, > > Depending how you configured your FlinkKafkaSource, but you can make the > source to commit consumed offsets back to Kafka. So one way to examine > them, would be to check those off

Flink 1.11 FlinkKafkaConsumer not propagating watermarks

2021-04-14 Thread Edward Bingham
Hi everyone, I'm seeing some strange behavior from FlinkKafkaConsumer. I wrote up some Flink processors using Flink 1.12, and tried to get them working on Amazon EMR. However Amazon EMR only supports Flink 1.11.2 at the moment. When I went to downgrade, I found, inexplicably, that watermarks were

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Thanks Piotrek! I forgot to mention that I'm using PyFlink and mostly Table APIs. The documentation ( https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#row-based-operations) suggests that Map() function is not currently supported in Python. So, what do you think w

WindowFunction is stuck until next message is processed although Watermark with idle timeout is applied.

2021-04-14 Thread Sung Gon Yi
Hello, I have a question about watermark with idle timeout. I made an example about it, https://github.com/skonmeme/rare_stream/blob/main/src/main/scala/com/skonuniverse/flink/RareStreamWithIdealTimeout.scala

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Sumeet Malhotra
Additional observation: From the Flink repo, the file "flink-python/pyflink/table/table.py" seems to support map(), flat_map() and other row based operations although the 1.12 documentation doesn't reflect that. Is that correct? From the code, it appears that these operations are supported in Pytho

Re: flink1.12.2 "Failed to execute job"

2021-04-14 Thread Becket Qin
Hi, Piotr is correct. The cause of this issue is likely because the instantiation of the SplitEnumerator is done in the JM main thread. FLINK-22282 has been created to address this issue. Thanks, Jiangjie (Becket) Qin On Wed, Apr 14, 2021 at 10:32 PM Piotr Nowojski wrote: > Hi, > > I haven't

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Yik San Chan
Hi Dian, Thanks for the reminder. Yes, the original udf implementation does not qualify the input and output type requirement. After adding a unit test, I was able to find what's wrong, and fix my UDF implementation. Here is the new implementation FYI. @udf(result_type=DataTypes.DOUBLE(), func_ty

Re: PyFlink Vectorized UDF throws NullPointerException

2021-04-14 Thread Dian Fu
Great! Thanks for letting me know~ > 2021年4月15日 上午11:01,Yik San Chan 写道: > > Hi Dian, > > Thanks for the reminder. Yes, the original udf implementation does not > qualify the input and output type requirement. After adding a unit test, I > was able to find what's wrong, and fix my UDF impleme

回复: flink1.12.2 "Failed to execute job"

2021-04-14 Thread 太平洋
Thanks. My Program read hundreds of small files from s3 by SQL. What has happened in the instantiation of the SplitEnumerator? What can i do to reduce the time now? -- 原始邮件 -- 发件人:

Re: Extract/Interpret embedded byte data from a record

2021-04-14 Thread Xingbo Huang
Hi Sumeet, Python Row-based operation will be supported in the releases-1.13. I guess you are looking at the code of the master branch. Since you are using the Python Table API, you can use python udf to parse your data. For the details of python UDF, you can refer to the doc[1]. [1] https://ci.a

Re: flink1.12.2 "Failed to execute job"

2021-04-14 Thread Arvid Heise
Hi, I guess you can workaround the current limitation by increasing the client.timeout. [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#client-timeout On Thu, Apr 15, 2021 at 7:06 AM 太平洋 <495635...@qq.com> wrote: > Thanks. My Program read hundreds of