Re: Setup of Scala/Flink project using Bazel

2021-05-19 Thread Salva Alcántara
Hi Austin, In the end I added the following target override for Scala: ``` maven_install( artifacts = [ # testing maven.artifact( group = "com.google.truth", artifact = "truth", version = "1.0.1", ), ] + flink_artifacts(

Re: Guidance for Integration Tests with External Technologies

2021-05-19 Thread Yun Gao
Hi Rion, Do you mean you are running the tests directly in the IDE like Idea for "multiple tests run in sequence" ? If the test could be successful when running separately, but would fail when running in sequence, then it seems there other tests should still infect on the failed tests. For the

Re: Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
Hi Marco, I think Flink does not need 500GB for the source, the source should be able to read from S3 in a streaming pattern (namely open the file, create an input stream and fetch data as required). But it might indeed need disk spaces for intermediate data between operators and the sort operat

Re: two questions about flink stream processing: kafka sources and TimerService

2021-05-19 Thread Jin Yi
thanks ingo! i'll look at moving to rolling my own operator and using ConnectedStreams.transform with it. On Tue, May 18, 2021 at 3:18 AM Ingo Bürk wrote: > Hi Jin, > > 1) As far as I know the order is only guaranteed for events from the same > partition. If you want events across partitions to

Re: Prometheus Reporter Enhancement

2021-05-19 Thread Mason Chen
Are there any plans to rework some of the metric name formulations (getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or label values are concatenated in the metric name and the information is redundant and makes the metric names longer. Would it make sense to remove the ta

Parallelism in Production: Best Practices

2021-05-19 Thread Yaroslav Tkachenko
Hi everyone, I'd love to learn more about how different companies approach specifying Flink parallelism. I'm specifically interested in real, production workloads. I can see a few common patterns: - Rely on default parallelism, scale by changing parallelism for the whole pipeline. I guess it onl

Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Marco Villalobos
> On May 19, 2021, at 7:26 AM, Yun Gao wrote: > > Hi Marco, > > For the remaining issues, > > 1. For the aggregation, the 500GB of files are not required to be fit into > memory. > Rough speaking for the keyed().window().reduce(), the input records would be > first > sort according to the

Re: Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-19 Thread Yun Gao
Hi Marco, For the remaining issues, 1. For the aggregation, the 500GB of files are not required to be fit into memory. Rough speaking for the keyed().window().reduce(), the input records would be first sort according to the key (time_series.name) via external sorts, which only consumes a fix

[no subject]

2021-05-19 Thread Wenyi Xu

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Thanks Dian. It worked for me Regards, Zerah On Wed, May 19, 2021, 5:14 PM Dian Fu wrote: > Hi Zerah, > > You could try to replace > ``` > value_schema = avro.schema.parse() > ``` > > with the following code: > ``` > JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser > value_schema

[Statefun] Truncated Messages in Python workers

2021-05-19 Thread Jan Brusch
Hi, recently we started seeing the following faulty behaviour in the Flink Stateful Functions HTTP communication towards external Python workers. This is only occuring when the system is under heavy load. The Java Application will send HTTP Messages to an external Python Function but the ext

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah, You could try to replace ``` value_schema = avro.schema.parse() ``` with the following code: ``` JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser value_schema = JSchemaParser().parse(value_schema_str) ``` The reason is that ```value_schema = avro.schema.parse() ``` will

Re:

2021-05-19 Thread Jake
Hi, vtygoss You can check out the official demo[1] ``` import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} val settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build() val tEnv = TableEnvironment.create(setting) ``` Regar

[no subject]

2021-05-19 Thread vtygoss
Hi, I have below use case Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn, but yarn application is still running when insert job finished, and yarn container is not released. I try to use BatchTableEnvironment, but “Primary key and unique key are not supporte

Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Thanks Xingbo! The workaround will probably work for now, at least it avoids having to refer to index values in the rest of the function. Cheers, Sumeet On Wed, May 19, 2021 at 3:02 PM Xingbo Huang wrote: > Hi Sumeet, > > Due to the limitation of the original PyFlink serializers design, there

Re: Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Xingbo Huang
Hi Sumeet, Due to the limitation of the original PyFlink serializers design, there is no way to pass attribute names to Row in row-based operations. In release-1.14, I am reconstructing the implementations of serializers[1]. After completion, accessing attribute names of `Row` in row-based operati

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian, Type of value_schema is <*class 'avro.schema.RecordSchema*'> I have only a Json schema string and schema registry url. Please find below snippet : import avro.schema value_schema_str = """ { "namespace": "com.nextgen.customer", "type": "record", "name": "employee"

Access Row fields by attribute name rather than by index in PyFlink TableFunction

2021-05-19 Thread Sumeet Malhotra
Hi, According to the documentation for PyFlink Table row based operations [1], typical usage is as follows: @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()]) def split(x: Row) -> Row: for s in x[1].split(","): yield x[0], s table.flat_map(split) Is there any way that row fie

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah, What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true? Regards, Dian > 2021年5月19日 下午3:41,Zerah J 写道: > > Hi Dian, > > Thanks for your suggestion. > > I tried to invoke ConfluentRe

Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-19 Thread Matthias Pohl
Hi Gary, Not sure whether you've seen my question in the Jira issue: May you be able to share the overall JobManager/TaskManager logs with us? That would help us understand the context a bit more on why no TaskManagerLocation was set. Let's move any further correspondence into FLINK-22688 [1] Best

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian, Thanks for your suggestion. I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below : class MyAvroRowDeserializationSchema(DeserializationSchema): def __init__(self, record_class: str = N