Hi Austin,
In the end I added the following target override for Scala:
```
maven_install(
artifacts = [
# testing
maven.artifact(
group = "com.google.truth",
artifact = "truth",
version = "1.0.1",
),
] + flink_artifacts(
Hi Rion,
Do you mean you are running the tests directly in the IDE like Idea for
"multiple tests run in sequence" ?
If the test could be successful when running separately, but would fail when
running in sequence, then
it seems there other tests should still infect on the failed tests.
For the
Hi Marco,
I think Flink does not need 500GB for the source, the source should
be able to read from S3 in a streaming pattern (namely open the file,
create an input stream and fetch data as required).
But it might indeed need disk spaces for intermediate data
between operators and the sort operat
thanks ingo! i'll look at moving to rolling my own operator and using
ConnectedStreams.transform with it.
On Tue, May 18, 2021 at 3:18 AM Ingo Bürk wrote:
> Hi Jin,
>
> 1) As far as I know the order is only guaranteed for events from the same
> partition. If you want events across partitions to
Are there any plans to rework some of the metric name formulations
(getMetricIdentifier or getLogicalScope)? Currently, the label keys and/or
label values are concatenated in the metric name and the information is
redundant and makes the metric names longer.
Would it make sense to remove the ta
Hi everyone,
I'd love to learn more about how different companies approach specifying
Flink parallelism. I'm specifically interested in real, production
workloads.
I can see a few common patterns:
- Rely on default parallelism, scale by changing parallelism for the whole
pipeline. I guess it onl
> On May 19, 2021, at 7:26 AM, Yun Gao wrote:
>
> Hi Marco,
>
> For the remaining issues,
>
> 1. For the aggregation, the 500GB of files are not required to be fit into
> memory.
> Rough speaking for the keyed().window().reduce(), the input records would be
> first
> sort according to the
Hi Marco,
For the remaining issues,
1. For the aggregation, the 500GB of files are not required to be fit into
memory.
Rough speaking for the keyed().window().reduce(), the input records would be
first
sort according to the key (time_series.name) via external sorts, which only
consumes
a fix
Thanks Dian. It worked for me
Regards,
Zerah
On Wed, May 19, 2021, 5:14 PM Dian Fu wrote:
> Hi Zerah,
>
> You could try to replace
> ```
> value_schema = avro.schema.parse()
> ```
>
> with the following code:
> ```
> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
> value_schema
Hi,
recently we started seeing the following faulty behaviour in the Flink
Stateful Functions HTTP communication towards external Python workers.
This is only occuring when the system is under heavy load.
The Java Application will send HTTP Messages to an external Python
Function but the ext
Hi Zerah,
You could try to replace
```
value_schema = avro.schema.parse()
```
with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```
The reason is that ```value_schema = avro.schema.parse() ``` will
Hi, vtygoss
You can check out the official demo[1]
```
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build()
val tEnv = TableEnvironment.create(setting)
```
Regar
Hi,
I have below use case
Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on yarn,
but yarn application is still running when insert job finished, and yarn
container is not released.
I try to use BatchTableEnvironment, but “Primary key and unique key are not
supporte
Thanks Xingbo! The workaround will probably work for now, at least it
avoids having to refer to index values in the rest of the function.
Cheers,
Sumeet
On Wed, May 19, 2021 at 3:02 PM Xingbo Huang wrote:
> Hi Sumeet,
>
> Due to the limitation of the original PyFlink serializers design, there
Hi Sumeet,
Due to the limitation of the original PyFlink serializers design, there is
no way to pass attribute names to Row in row-based operations. In
release-1.14, I am reconstructing the implementations of serializers[1].
After completion, accessing attribute names of `Row` in row-based
operati
Hi Dian,
Type of value_schema is <*class 'avro.schema.RecordSchema*'>
I have only a Json schema string and schema registry url. Please find below
snippet :
import avro.schema
value_schema_str = """
{
"namespace": "com.nextgen.customer",
"type": "record",
"name": "employee"
Hi,
According to the documentation for PyFlink Table row based operations [1],
typical usage is as follows:
@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
def split(x: Row) -> Row:
for s in x[1].split(","):
yield x[0], s
table.flat_map(split)
Is there any way that row fie
Hi Zerah,
What’s the type of value_schema? It should be a Java object of type Schema.
From the exception, it seems that it’s a class instead of object. Is this true?
Regards,
Dian
> 2021年5月19日 下午3:41,Zerah J 写道:
>
> Hi Dian,
>
> Thanks for your suggestion.
>
> I tried to invoke ConfluentRe
Hi Gary,
Not sure whether you've seen my question in the Jira issue: May you be able
to share the overall JobManager/TaskManager logs with us? That would help
us understand the context a bit more on why no TaskManagerLocation was set.
Let's move any further correspondence into FLINK-22688 [1]
Best
Hi Dian,
Thanks for your suggestion.
I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric
method from Python. But it's not working. Kindly check the code snippet
below :
class MyAvroRowDeserializationSchema(DeserializationSchema):
def __init__(self, record_class: str = N
21 matches
Mail list logo