Re: [DISCUSS] FLIP-379: Dynamic source parallelism inference for batch jobs

2023-11-08 Thread Leonard Xu
Thanks Xia and Zhu Zhu for kickoff this discussion. 

The dynamic source parallelism inference is a useful feature for batch story. 
I’ve some comments about current design.

1.How user disable the parallelism inference if they want to use fixed source 
parallelism? They can configure fixed parallelism in table layer currently as 
you explained above.

2.Could you explain the priority the static parallelism set from table layer 
and the proposed dynamic source parallelism? And changing the default value 
`table.exec.hive.infer-source-parallelism` as a sub-task does not resolve all 
case, because other Sources can set their own parallelism too.

3.Current design only works for batch josb, the workflow for streaming job may 
looks like (1) inference  parallelism for streaming source like kafka (2) stop 
job with a savepoint  (3) apply new parallelism for job (4) schedule the 
streaming job from savepoint which is totally different, the later one lacks a 
lot of infra in Flink, right?  So, could we consider the boundness info when 
design the interface? Both FileSource and Hive Source offer streaming read 
ability, imaging this case: Flink Streaming Hive Source should not apply the 
dynamic source parallelism even it implemented the feature as it severing a 
streaming job.

Best,
Leonard


> 2023年11月1日 下午6:21,Xia Sun  写道:
> 
> Thanks Lijie for the comments!
> 1. For Hive source, dynamic parallelism inference in batch scenarios is a
> superset of static parallelism inference. As a follow-up task, we can
> consider changing the default value of
> 'table.exec.hive.infer-source-parallelism' to false.
> 
> 2. I think that both dynamic parallelism inference and static parallelism
> inference have their own use cases. Currently, for streaming sources and
> other sources that are not sensitive to dynamic information, the benefits
> of dynamic parallelism inference may not be significant. In such cases, we
> can continue to use static parallelism inference.
> 
> Thanks,
> Xia
> 
> Lijie Wang  于2023年11月1日周三 14:52写道:
> 
>> Hi Xia,
>> 
>> Thanks for driving this FLIP, +1 for the proposal.
>> 
>> I have 2 questions about the relationship between static inference and
>> dynamic inference:
>> 
>> 1. AFAIK, currently the hive table source enable static inference by
>> default. In this case, which one (static vs dynamic) will take effect ? I
>> think it would be better if we can point this out in FLIP
>> 
>> 2. As you mentioned above, dynamic inference is the most ideal way, so do
>> we have plan to deprecate the static inference in the future?
>> 
>> Best,
>> Lijie
>> 
>> Zhu Zhu  于2023年10月31日周二 20:19写道:
>> 
>>> Thanks for opening the FLIP and kicking off this discussion, Xia!
>>> The proposed changes make up an important missing part of the dynamic
>>> parallelism inference of adaptive batch scheduler.
>>> 
>>> Besides that, it is also one good step towards supporting dynamic
>>> parallelism inference for streaming sources, e.g. allowing Kafka
>>> sources to determine its parallelism automatically based on the
>>> number of partitions.
>>> 
>>> +1 for the proposal.
>>> 
>>> Thanks,
>>> Zhu
>>> 
>>> Xia Sun  于2023年10月31日周二 16:01写道:
>>> 
 Hi everyone,
 I would like to start a discussion on FLIP-379: Dynamic source
>>> parallelism
 inference for batch jobs[1].
 
 In general, there are three main ways to set source parallelism for
>> batch
 jobs:
 (1) User-defined source parallelism.
 (2) Connector static parallelism inference.
 (3) Dynamic parallelism inference.
 
 Compared to manually setting parallelism, automatic parallelism
>> inference
 is easier to use and can better adapt to varying data volumes each day.
 However, static parallelism inference cannot leverage runtime
>>> information,
 resulting in inaccurate parallelism inference. Therefore, for batch
>> jobs,
 dynamic parallelism inference is the most ideal, but currently, the
>>> support
 for adaptive batch scheduler is not very comprehensive.
 
 Therefore, we aim to introduce a general interface that enables the
 adaptive batch scheduler to dynamically infer the source parallelism at
 runtime. Please refer to the FLIP[1] document for more details about
>> the
 proposed design and implementation.
 
 I also thank Zhu Zhu and LiJie Wang for their suggestions during the
 pre-discussion.
 Looking forward to your feedback and suggestions, thanks.
 
 [1]
 
 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-379%3A+Dynamic+source+parallelism+inference+for+batch+jobs
 
 Best regards,
 Xia
 
>>> 
>> 



Re: [DISCUSS] Release Flink 1.16.3

2023-11-08 Thread ConradJam
+1

Sergey Nuyanzin  于2023年11月8日周三 13:08写道:

> +1 for the final release
> and thanks for the efforts
>
> On Wed, Nov 8, 2023 at 4:09 AM Leonard Xu  wrote:
>
> > Thanks Rui for driving this.
> >
> > +1 to release 1.16.3 and make it as the  final bugix release of 1.16
> > series.
> >
> > Best,
> > Leonard
> >
>
>
> --
> Best regards,
> Sergey
>


-- 
Best

ConradJam


Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-08 Thread Piotr Nowojski
Hi Zakelly,

Thanks for the comments. Quick answer for both of your questions would be
that it probably should be
left as a future work. For more detailed answers please take a look below :)

> Does it mean the inclusion and subdivision relationships of spans defined
> by "parent_id" are not supported? I think it is a very necessary feature
> for the trace.

Yes exactly, that is the current limitation. This could be solved somehow
one way or another in the future.

Support for reporting multi span traces all at once - for example
`CheckpointStatsTracker` running JM,
could upon checkpoint completion create in one place the whole structure of
parent spans, to have for
example one span per each subtask. This would be a relatively easy follow
up.

However, if we would like to create true distributed traces, with spans
reported from many different
components, potentially both on JM and TM, the problem is a bit deeper. The
issue in that case is how
to actually fill out `parrent_id` and `trace_id`? Passing some context
entity as a java object would be
unfeasible. That would require too many changes in too many places. I think
the only realistic way
to do it, would be to have a deterministic generator of `parten_id` and
`trace_id` values.

For example we could create the parent trace/span of the checkpoint on JM,
and set those ids to
something like: `jobId#attemptId#checkpointId`. Each subtask then could
re-generate those ids
and subtasks' checkpoint span would have an id of
`jobId#attemptId#checkpointId#subTaskId`.
Note that this is just an example, as most likely distributed spans for
checkpointing do not make
sense, as we can generate them much easier on the JM anyway.

> In addition to checkpoint and recovery, I believe the trace would also be
> valuable for performance tuning. If Flink can trace and visualize the time
> cost of each operator and stage for a sampled record, users would be able
> to easily determine the end-to-end latency and identify performance issues
> for optimization. Looking forward to seeing these in the future.

I'm not sure if I understand the proposal - I don't know how traces could
be used for this purpose?
Traces are perfect for one of events (like checkpointing, recovery, etc),
not for continuous monitoring
(like processing records). That's what metrics are. Creating trace and
span(s) per each record would
be prohibitively expensive.

Unless you mean in batch/bounded jobs? Then yes, we could create a bounded
job trace, with spans
for every stage/task/subtask.

Best,
Piotrek


śr., 8 lis 2023 o 05:30 Zakelly Lan  napisał(a):

> Hi Piotr,
>
> Happy to see the trace! Thanks for this proposal.
>
> One minor question: It is mentioned in the interface of Span:
>
> Currently we don't support traces with multiple spans. Each span is
> > self-contained and represents things like a checkpoint or recovery.
>
>
> Does it mean the inclusion and subdivision relationships of spans defined
> by "parent_id" are not supported? I think it is a very necessary feature
> for the trace.
>
> In addition to checkpoint and recovery, I believe the trace would also be
> valuable for performance tuning. If Flink can trace and visualize the time
> cost of each operator and stage for a sampled record, users would be able
> to easily determine the end-to-end latency and identify performance issues
> for optimization. Looking forward to seeing these in the future.
>
> Best,
> Zakelly
>
>
> On Tue, Nov 7, 2023 at 6:27 PM Piotr Nowojski 
> wrote:
>
> > Hi Rui,
> >
> > Thanks for the comments!
> >
> > > 1. I see the trace just supports Span? Does it support trace events?
> > > I'm not sure whether tracing events is reasonable for TraceReporter.
> > > If it supports, flink can report checkpoint and checkpoint path
> > proactively.
> > > Currently, checkpoint lists or the latest checkpoint can only be
> fetched
> > > by external components or platforms. And report is more timely and
> > > efficient than fetch.
> >
> > No, currently the `TraceReporter` that I'm introducing supports only
> single
> > span traces.
> > So currently neither events on their own, nor events inside spans are not
> > supported.
> > This is done just for the sake of simplicity, and test out the basic
> > functionality. But I think,
> > those currently missing features should be added at some point in
> > the future.
> >
> > About structured logging (basically events?) I vaguely remember some
> > discussions about
> > that. It might be a much larger topic, so I would prefer to leave it out
> of
> > the scope of this
> > FLIP.
> >
> > > 2. This FLIP just monitors the checkpoint and task recovery, right?
> >
> > Yes, it only adds single span traces for checkpointing and
> > recovery/initialisation - one
> > span per whole job per either recovery/initialization process or per each
> > checkpoint.
> >
> > > Could we add more operations in this FLIP? In our production, we
> > > added a lot of trace reporters for job starts and scheduler operation.
> >

Re: [VOTE] FLIP-376: Add DISTRIBUTED BY clause

2023-11-08 Thread Leonard Xu
+1(binding)

Best,
Leonard

> 2023年11月8日 下午1:05,Sergey Nuyanzin  写道:
> 
> +1 (binding)
> 
> On Wed, Nov 8, 2023 at 6:02 AM Zhanghao Chen 
> wrote:
> 
>> +1 (non-binding)
>> 
>> Best,
>> Zhanghao Chen
>> 
>> From: Timo Walther 
>> Sent: Monday, November 6, 2023 19:38
>> To: dev 
>> Subject: [VOTE] FLIP-376: Add DISTRIBUTED BY clause
>> 
>> Hi everyone,
>> 
>> I'd like to start a vote on FLIP-376: Add DISTRIBUTED BY clause[1] which
>> has been discussed in this thread [2].
>> 
>> The vote will be open for at least 72 hours unless there is an objection
>> or not enough votes.
>> 
>> [1]
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
>> [2] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1
>> 
>> Cheers,
>> Timo
>> 
> 
> 
> -- 
> Best regards,
> Sergey



[DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-08 Thread Hongshun Wang
Hi devs,

I would like to start a discussion on FLIP-389: Annotate
SingleThreadFetcherManager and FutureCompletingBlockingQueue as
PublicEvolving.[

1].

Though the SingleThreadFetcherManager is annotated as Internal, it actually
acts as some-degree public API, which is widely used in many connector
projects: flink-cdc-connector

, flink-connector-mongodb

and
soon.

Moreover, even the constructor of SingleThreadMultiplexSourceReaderBase
 (which is PublicEvolving) includes the params of SingleThreadFetcherManager
 and FutureCompletingBlockingQueue.  That means that the
SingleThreadFetcherManager  and FutureCompletingBlockingQueue have already
been exposed to users for a long time and are widely used.

Considering that all source implementations are using them de facto, why
not annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue
as PublicEvolving so that developers will modify it more carefully to avoid
any potential issues.  As shown in FLINK-31324[2], FLINK-28853[3] used
to change the default constructor of SingleThreadFetcherManager. However,
it influenced a lot. Finally, the former constructor was added back and
marked as Deprecated。

In conclusion, the goal of this FLIP is to annotate
SingleThreadFetcherManager(includes its parent class) and
FutureCompletingBlockingQueue as PublicEvolving.

Looking forward to hearing from you.


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498

[2] https://issues.apache.org/jira/browse/FLINK-31324

[3] https://issues.apache.org/jira/browse/FLINK-28853


[jira] [Created] (FLINK-33482) Flink benchmark regression check in the new machines

2023-11-08 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-33482:
---

 Summary: Flink benchmark regression check in the new machines
 Key: FLINK-33482
 URL: https://issues.apache.org/jira/browse/FLINK-33482
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Reporter: Zakelly Lan
Assignee: Zakelly Lan


After FLINK-33052, the codespeed and benchmark servers are hosted on Aliyun. We 
need to monidy the benchmark daily monitoring scripts for the new environment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-08 Thread Rui Fan
Hi all!

I would like to start a discussion of FLIP-390: Support System out and err
to be redirected to LOG or discarded[1].

In various production environments, either cloud native or physical
machines, the disk space that Flink TaskManager can use is limited.

In general, the flink users shouldn't use the `System.out.println` in
production,
however this may happen when the number of Flink jobs and job developers
is very large. Flink job may use System.out to output a large amount of
data
to the taskmanager.out file. This file will not roll, it will always
increment.
Eventually the upper limit of what the TM can be used for is reached.

We can support System out and err to be redirected to LOG or discarded,
the LOG can roll and won't increment forever.

This feature is useful for SREs who maintain Flink environments, they can
redirect System.out to LOG by default. Although the cause of this problem
is
that the user's code is not standardized, for SRE, pushing users to modify
the code one by one is usually a very time-consuming operation. It's also
useful for job stability where System.out is accidentally used.

Looking forward to your feedback, thanks~

[1] https://cwiki.apache.org/confluence/x/4guZE

Best,
Rui


Re: [DISCUSS] FLIP-389: Annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue as PublicEvolving

2023-11-08 Thread Leonard Xu
Thanks Hongshun for starting this discussion. 

+1 from my side.

IIRC, @Jiangjie(Becket) also mentioned this in FLINK-31324 comment[1].

Best,
Leonard

[1] 
https://issues.apache.org/jira/browse/FLINK-31324?focusedCommentId=17696756&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17696756



> 2023年11月8日 下午5:42,Hongshun Wang  写道:
> 
> Hi devs,
> 
> I would like to start a discussion on FLIP-389: Annotate
> SingleThreadFetcherManager and FutureCompletingBlockingQueue as
> PublicEvolving.[
> 
> 1].
> 
> Though the SingleThreadFetcherManager is annotated as Internal, it actually
> acts as some-degree public API, which is widely used in many connector
> projects: flink-cdc-connector
> 
> , flink-connector-mongodb
> 
> and
> soon.
> 
> Moreover, even the constructor of SingleThreadMultiplexSourceReaderBase
> (which is PublicEvolving) includes the params of SingleThreadFetcherManager
> and FutureCompletingBlockingQueue.  That means that the
> SingleThreadFetcherManager  and FutureCompletingBlockingQueue have already
> been exposed to users for a long time and are widely used.
> 
> Considering that all source implementations are using them de facto, why
> not annotate SingleThreadFetcherManager and FutureCompletingBlockingQueue
> as PublicEvolving so that developers will modify it more carefully to avoid
> any potential issues.  As shown in FLINK-31324[2], FLINK-28853[3] used
> to change the default constructor of SingleThreadFetcherManager. However,
> it influenced a lot. Finally, the former constructor was added back and
> marked as Deprecated。
> 
> In conclusion, the goal of this FLIP is to annotate
> SingleThreadFetcherManager(includes its parent class) and
> FutureCompletingBlockingQueue as PublicEvolving.
> 
> Looking forward to hearing from you.
> 
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465498
> 
> [2] https://issues.apache.org/jira/browse/FLINK-31324
> 
> [3] https://issues.apache.org/jira/browse/FLINK-28853



Re: [DISCUSS] Connector releases for Flink 1.18

2023-11-08 Thread Etienne Chauchot

Hi,

Thanks for starting the discussion, I already started testing Cassandra 
connector on Flink 1.18. The error in the nightly build linked here is a 
known issue tracked by FLINK-32353. To fix it, it needs a change in 
connector-parent and in the ci utils that are tracked by FLINK-32563.


Best

Etienne

Le 07/11/2023 à 02:20, mystic lama a écrit :

Hi,

I looked into both connectors, they both are failing at archunit. I tried a
few things but with my limited experience couldn't make much progress.
Don't want to block the release. If anyone knows what needs to be done,
please move forward with the fixes.

Hopefully, shall be able to help more in the future.

Thanks
Ash


On Fri, 3 Nov 2023 at 08:35, mystic lama  wrote:


Hi,

I can look into pulsar and Cassandra fixes. I agree it's archunit issue
based on the build logs.
I don't have much experience with it, but I was able to reproduce it
locally.

I can send out PR's over the weekend. Do we have JIRA's for this? If not I
can create to track.

Thanks
@sh

On Fri, 3 Nov 2023 at 02:20, Martijn Visser
wrote:


Hi Danny,

Thanks a lot for starting the discussion on this topic! I know that
Pulsar is failing because of Archunit, which I expect the same issue
to be for Cassandra (I know that Etienne was working on this). Happy
to help.

Best regards,

Martijn

On Thu, Nov 2, 2023 at 9:08 PM Danny Cranmer
wrote:

Hey all.

Now Flink 1.18 is released we need to do some connector releases for
integration parity. We can use this thread to start the discussions for
each connector release and spawn separate vote threads. Kafka is done

[1]

(thanks Gordon) and AWS connectors are in process [2], I appreciate help
with votes on that one.

Opensearch: Flink 1.18 nightly build passing [3]. I volunteer to be

release

manager for this one. I will consolidate 1.0.2 [4] and 1.1.0 [5] into a
single release as 1.1.0.
MongoDB: Flink 1.18 nightly build passing [6]. I volunteer to be release
manager for this one. I will work with Jiabao to get FLINK-33257 merged
into 1.1.0 and release that [7].
GCP Pub Sub: Flink 1.18 nightly build passing [8]. I volunteer to be
release manager for this one. Looks like 3.0.2 is ready to go [9], we
should proceed with this.

ElasticSearch: Flink 1.18 nightly build passing [10]. There are a good
stack of changes ready for 3.1.0 [11], suggest we release that.
JDBC: Flink 1.18 nightly build passing [12]. There are a good stack of
changes ready for 3.2.0 [13], suggest we release that.
RabbitMQ: Flink 1.18 nightly build passing [14]. There are no changes

ready

for 3.0.2 [15], recommend we do a minimal 3.0.1-1.18.

Pulsar: The nightly CI is failing [16], needs a deeper look
Cassandra: The nightly CI is failing [17], needs a deeper look

Once I have completed Opensearch/MongoDB/GCP I will pick up others, but
hope others can help out.

Thanks,
Danny

[1]https://lists.apache.org/thread/0lvrm9hl3hnn1fpr74k68lsm22my8xh7
[2]https://lists.apache.org/thread/ko6nrtfsykkz9c9k9392jfj4l9f7qg11
[3]https://github.com/apache/flink-connector-opensearch/actions
[4]https://issues.apache.org/jira/projects/FLINK/versions/12353142
[5]https://issues.apache.org/jira/projects/FLINK/versions/12353141
[6]https://github.com/apache/flink-connector-mongodb/actions
[7]https://issues.apache.org/jira/projects/FLINK/versions/12353483
[8]https://github.com/apache/flink-connector-gcp-pubsub/actions
[9]https://issues.apache.org/jira/projects/FLINK/versions/12353144
[10]https://github.com/apache/flink-connector-elasticsearch/actions
[11]https://issues.apache.org/jira/projects/FLINK/versions/12352520
[12]https://github.com/apache/flink-connector-jdbc/actions
[13]https://issues.apache.org/jira/projects/FLINK/versions/12353143
[14]https://github.com/apache/flink-connector-rabbitmq/actions
[15]https://issues.apache.org/jira/projects/FLINK/versions/12353145
[16]https://github.com/apache/flink-connector-pulsar/actions
[17]https://github.com/apache/flink-connector-cassandra/actions

[jira] [Created] (FLINK-33483) Why is “UNDEFINED” defined in the Flink task status?

2023-11-08 Thread Xin Chen (Jira)
Xin Chen created FLINK-33483:


 Summary: Why is “UNDEFINED” defined in the Flink task status?
 Key: FLINK-33483
 URL: https://issues.apache.org/jira/browse/FLINK-33483
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / RPC, Runtime / Task
Affects Versions: 1.12.2
Reporter: Xin Chen


In the Flink on Yarn mode, if an unknown status appears in the Flink log, 
jm(jobmanager) will report the task status as undefined. The Yarn page will 
display the state as FINISHED, but the final status is *UNDEFINED*. In terms of 
business, it is unknown whether the task has failed or succeeded, and whether 
to retry. It has a certain impact. Why should we design UNDEFINED? Usually, 
this situation occurs due to zk(zookeeper) disconnection or jm abnormality, 
etc. Since the abnormality is present, why not use FAILED?

 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33484) Flink Kafka Connector Offset Lag Issue with Transactional Data and Read Committed Isolation Level

2023-11-08 Thread Darcy Lin (Jira)
Darcy Lin created FLINK-33484:
-

 Summary: Flink Kafka Connector Offset Lag Issue with Transactional 
Data and Read Committed Isolation Level
 Key: FLINK-33484
 URL: https://issues.apache.org/jira/browse/FLINK-33484
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.17.1
 Environment: Flink 1.17.1

kafka 2.5.1
Reporter: Darcy Lin


We have encountered an issue with the Flink Kafka connector when consuming 
transactional data from Kafka with the {{isolation.level}} set to 
{{read_committed}} ({{{}setProperty("isolation.level", "read_committed"){}}}). 
The problem is that even when all the data from a topic is consumed, the offset 
lag is not 0, but 1. However, when using the Kafka Java client to consume the 
same data, this issue does not occur.

We suspect that this issue arises due to the way Flink Kafka connector 
calculates the offset. The problem seems to be in the 
{{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method. 
When saving the offset, the method calls 
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this 
statement works correctly in a regular Kafka scenario, it might not be accurate 
when the {{read_committed}} mode is used. We believe that it should be 
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as 
transactional data in Kafka occupies an additional offset to store the 
transaction marker.

We request the Flink team to investigate this issue and provide us with 
guidance on how to resolve it.

Thank you for your attention and support.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33485) Optimize the EXISTS sub-query by Metadata RowCount

2023-11-08 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-33485:
---

 Summary: Optimize the EXISTS sub-query by Metadata RowCount
 Key: FLINK-33485
 URL: https://issues.apache.org/jira/browse/FLINK-33485
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.18.0
Reporter: Sergey Nuyanzin


If the sub-query is guaranteed to produce at least one row, just return TRUE. 
If the sub-query is guaranteed to produce no row, just return FALSE.

inspired by CALCITE-5117 however since there is {{FlinkSubQueryRemoveRule}} 
then it shold be adopted accordingly

examples
{code:sql}
SELECT * FROM T2 WHERE EXISTS (SELECT SUM(a1), COUNT(*) FROM T1 WHERE 1=2)
{code}

aggregation functions always return 1 row even if there is an empty table then 
we could just replace this query with 
{code:sql}
SELECT * FROM T2 
{code}

another example
{code:sql}
SELECT * FROM MyTable WHERE NOT EXISTS (SELECT a FROM MyTable LIMIT 0)
{code}

{{LIMIT 0}} means no rows so it cold be optimized to

{code:sql}
SELECT * FROM MyTable
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-08 Thread Jinzhong Li
Hi Piotr,

Thanks for driving this proposal!   I strongly agree that the existing
metric APIs are not suitable for monitoring restore/checkpoint behavior!

I think the TM-level recovery/checkpointing traces are necessary in the
future. In our production environment, we sometimes encounter that job
recovery time is very long (30min+), due to several subTask heavy disk
traffic. The TM-level recovery trace is helpful for troubleshooting such
issues.

Best
Jinzhong

On Wed, Nov 8, 2023 at 5:09 PM Piotr Nowojski  wrote:

> Hi Zakelly,
>
> Thanks for the comments. Quick answer for both of your questions would be
> that it probably should be
> left as a future work. For more detailed answers please take a look below
> :)
>
> > Does it mean the inclusion and subdivision relationships of spans defined
> > by "parent_id" are not supported? I think it is a very necessary feature
> > for the trace.
>
> Yes exactly, that is the current limitation. This could be solved somehow
> one way or another in the future.
>
> Support for reporting multi span traces all at once - for example
> `CheckpointStatsTracker` running JM,
> could upon checkpoint completion create in one place the whole structure of
> parent spans, to have for
> example one span per each subtask. This would be a relatively easy follow
> up.
>
> However, if we would like to create true distributed traces, with spans
> reported from many different
> components, potentially both on JM and TM, the problem is a bit deeper. The
> issue in that case is how
> to actually fill out `parrent_id` and `trace_id`? Passing some context
> entity as a java object would be
> unfeasible. That would require too many changes in too many places. I think
> the only realistic way
> to do it, would be to have a deterministic generator of `parten_id` and
> `trace_id` values.
>
> For example we could create the parent trace/span of the checkpoint on JM,
> and set those ids to
> something like: `jobId#attemptId#checkpointId`. Each subtask then could
> re-generate those ids
> and subtasks' checkpoint span would have an id of
> `jobId#attemptId#checkpointId#subTaskId`.
> Note that this is just an example, as most likely distributed spans for
> checkpointing do not make
> sense, as we can generate them much easier on the JM anyway.
>
> > In addition to checkpoint and recovery, I believe the trace would also be
> > valuable for performance tuning. If Flink can trace and visualize the
> time
> > cost of each operator and stage for a sampled record, users would be able
> > to easily determine the end-to-end latency and identify performance
> issues
> > for optimization. Looking forward to seeing these in the future.
>
> I'm not sure if I understand the proposal - I don't know how traces could
> be used for this purpose?
> Traces are perfect for one of events (like checkpointing, recovery, etc),
> not for continuous monitoring
> (like processing records). That's what metrics are. Creating trace and
> span(s) per each record would
> be prohibitively expensive.
>
> Unless you mean in batch/bounded jobs? Then yes, we could create a bounded
> job trace, with spans
> for every stage/task/subtask.
>
> Best,
> Piotrek
>
>
> śr., 8 lis 2023 o 05:30 Zakelly Lan  napisał(a):
>
> > Hi Piotr,
> >
> > Happy to see the trace! Thanks for this proposal.
> >
> > One minor question: It is mentioned in the interface of Span:
> >
> > Currently we don't support traces with multiple spans. Each span is
> > > self-contained and represents things like a checkpoint or recovery.
> >
> >
> > Does it mean the inclusion and subdivision relationships of spans defined
> > by "parent_id" are not supported? I think it is a very necessary feature
> > for the trace.
> >
> > In addition to checkpoint and recovery, I believe the trace would also be
> > valuable for performance tuning. If Flink can trace and visualize the
> time
> > cost of each operator and stage for a sampled record, users would be able
> > to easily determine the end-to-end latency and identify performance
> issues
> > for optimization. Looking forward to seeing these in the future.
> >
> > Best,
> > Zakelly
> >
> >
> > On Tue, Nov 7, 2023 at 6:27 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Rui,
> > >
> > > Thanks for the comments!
> > >
> > > > 1. I see the trace just supports Span? Does it support trace events?
> > > > I'm not sure whether tracing events is reasonable for TraceReporter.
> > > > If it supports, flink can report checkpoint and checkpoint path
> > > proactively.
> > > > Currently, checkpoint lists or the latest checkpoint can only be
> > fetched
> > > > by external components or platforms. And report is more timely and
> > > > efficient than fetch.
> > >
> > > No, currently the `TraceReporter` that I'm introducing supports only
> > single
> > > span traces.
> > > So currently neither events on their own, nor events inside spans are
> not
> > > supported.
> > > This is done just for the sake of simplicity, and test out the basic
> > > functional

Re: [VOTE] Release flink-connector-gcp-pubsub v3.0.2, release candidate #1

2023-11-08 Thread Danny Cranmer
Hey Leonard,

Thanks for helping to verify the release. The 1.16.0 Flink version in the
pom is a miss, ideally it should be updated to 1.18.0, additionally this
should have been updated to 1.17.x previously. I would not consider it a
hard blocker since the Maven build overrides this variable based on the
provided -Dflink.version and the 1.17/1.18 binaries are valid. However it
is non ideal that the default version in the source is outdated. Given that
we are yet to receive any binding votes I am happy to spin an rc2 however
am a bit busy this week at Flink Forward. I will consider this vote
open for now unless you make your -1 binding.

Thanks,
Danny

On Tue, Nov 7, 2023 at 8:02 PM Leonard Xu  wrote:

> Thanks Danny for driving this.  I'm considering -1, please correct me if I
> understand wrong.
>
> >> * The sources can be compiled and unit tests pass with flink.version
> 1.17.1
> >> and flink.version 1.18.0
> >>
> >> * Nexus has two staged artifact ids for 3.0.2-1.17 and 3.0.2-1.18
> >> - flink-connector-gcp-pubsub (.jar, -javadoc.jar, -sources.jar and .pom)
> >> - flink-connector-gcp-pubsub-parent (only .pom)
>
>
> This release aims to support Flink 1.17 and new released Flink 1.18,but
> why is the version in pom file [1] still 1.16.0 ?  IIUC, it should be
> 1.17.0  according the process [2].
>
> Best,
> Leonard
>
> [1]
> https://github.com/apache/flink-connector-gcp-pubsub/blob/v3.0.2-rc1/pom.xml#L51
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
>
>
> > 2023年11月7日 下午12:03,Samrat Deb  写道:
> >
> > +1(non-binding)
> >
> > - Checked release notes
> > - Verified checksums and signatures
> > - Verified no binaries in release
> > - Build connector from source
> >
> > Bests,
> > Samrat
> >
> > On Mon, 6 Nov 2023 at 8:20 PM, Ryan Skraba  >
> > wrote:
> >
> >> Hello! +1 (non-binding)
> >>
> >> One note: the parent pom still has 1.16.0 for the Maven property of
> >> flink.version for both 1.17 and 1.18 releases.
> >>
> >> I've validated the source for the RC1:
> >> flink-connector-gcp-pubsub-3.0.2-src.tgz at r65060
> >> * The sha512 checksum is OK.
> >> * The source file is signed correctly.
> >> * The signature 0F79F2AFB2351BC29678544591F9C1EC125FD8DB is found in the
> >> KEYS file, and on https://keyserver.ubuntu.com/
> >> * The source file is consistent with the GitHub tag v3.0.2-rc1, which
> >> corresponds to commit 4c6be836e6c0f36ef5711f12d7b935254e7d248d
> >> - The files explicitly excluded by create_pristine_sources (such as
> >> .gitignore and the submodule tools/releasing/shared) are not present.
> >> * Has a LICENSE file and a NOTICE file
> >> * Does not contain any compiled binaries.
> >>
> >>
> >>
> >> I did a simple smoke test on an emulated Pub/Sub with the 1.18 version.
> >>
> >> All my best, Ryan Skraba
> >>
>
>


Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-08 Thread Piotr Nowojski
Hi Rui,

Thanks for the proposal.

+1 I don't have any major comments :)

One nit. In `SystemOutRedirectToLog` in this code:

   System.arraycopy(buf, count - LINE_SEPARATOR_LENGTH, bytes, 0,
LINE_SEPARATOR_LENGTH);
return Arrays.equals(LINE_SEPARATOR_BYTES, bytes)

Is there a reason why you are suggesting to copy out bytes from `buf` to
`bytes`,
instead of using `Arrays.equals(int[] a, int aFromIndex, int aToIndex,
int[] b, int bFromIndex, int bToIndex)`?

Best,
Piotrek

śr., 8 lis 2023 o 11:53 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi all!
>
> I would like to start a discussion of FLIP-390: Support System out and err
> to be redirected to LOG or discarded[1].
>
> In various production environments, either cloud native or physical
> machines, the disk space that Flink TaskManager can use is limited.
>
> In general, the flink users shouldn't use the `System.out.println` in
> production,
> however this may happen when the number of Flink jobs and job developers
> is very large. Flink job may use System.out to output a large amount of
> data
> to the taskmanager.out file. This file will not roll, it will always
> increment.
> Eventually the upper limit of what the TM can be used for is reached.
>
> We can support System out and err to be redirected to LOG or discarded,
> the LOG can roll and won't increment forever.
>
> This feature is useful for SREs who maintain Flink environments, they can
> redirect System.out to LOG by default. Although the cause of this problem
> is
> that the user's code is not standardized, for SRE, pushing users to modify
> the code one by one is usually a very time-consuming operation. It's also
> useful for job stability where System.out is accidentally used.
>
> Looking forward to your feedback, thanks~
>
> [1] https://cwiki.apache.org/confluence/x/4guZE
>
> Best,
> Rui
>


Fink 1.17 vs 1.18

2023-11-08 Thread Amir Hossein Sharifzadeh
Hello Flink dev team,

This question might have been answered already but I want to ask again
after the official release 1.18.

What are the major differences between Fink 1.17 and 1.18? Is 1.18 faster
for streaming processing?

Thank you very much.

Best,
Amir


[jira] [Created] (FLINK-33486) Pulsar Client Send Timeout Terminates TaskManager

2023-11-08 Thread Jason Kania (Jira)
Jason Kania created FLINK-33486:
---

 Summary: Pulsar Client Send Timeout Terminates TaskManager
 Key: FLINK-33486
 URL: https://issues.apache.org/jira/browse/FLINK-33486
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.1
Reporter: Jason Kania


Currently, when the Pulsar Producer encounters a timeout when attempting to 
send data, it generates an unhandled TimeoutException. This is not a reasonable 
way to handle the timeout. The situation should be handled in a graceful way 
either through additional parameters that put control of the action under the 
discretion of the user or through some callback mechanism that the user can 
work with to write code. Unfortunately, fight now, this causes a termination of 
the task manager which then leads to other issues.

Increasing the timeout period to avoid the issue is not really an option to 
ensure proper handling in the event that the situation does occur.

The exception is as follows:

org.apache.flink.util.FlinkRuntimeException: Failed to send data to Pulsar: 
persistent://public/default/myproducer-partition-0
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.throwSendingException(PulsarWriter.java:182)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.connector.pulsar.sink.writer.PulsarWriter.lambda$write$0(PulsarWriter.java:172)
 ~[flink-connector-pulsar-4.0.0-1.17.jar:4.0.0-1.17]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) 
~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 ~[flink-dist-1.17.1.jar:1.17.1]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) 
[flink-dist-1.17.1.jar:1.17.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-dist-1.17.1.jar:1.17.1]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: 
The producer myproducer- f4b1580b-1ea8-4c21-9d0b-da4d12ca6f93 can not send 
message to the topic persistent://public/default/myproducer-partition-0 within 
given timeout
        at 
org.apache.pulsar.client.impl.ProducerImpl.run(ProducerImpl.java:1993) 
~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.run(HashedWheelTimer.java:715)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.ImmediateExecutor.execute(ImmediateExecutor.java:34)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:703)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:790)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:503)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[pulsar-client-all-2.11.2.jar:2.11.2]
        ... 1 more

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33487) Add the new Snowflake connector to supported list

2023-11-08 Thread Mohsen Rezaei (Jira)
Mohsen Rezaei created FLINK-33487:
-

 Summary: Add the new Snowflake connector to supported list
 Key: FLINK-33487
 URL: https://issues.apache.org/jira/browse/FLINK-33487
 Project: Flink
  Issue Type: New Feature
  Components: Documentation
Affects Versions: 1.17.1, 1.18.0
Reporter: Mohsen Rezaei
 Fix For: 1.17.2, 1.18.1


Code was contributed in FLINK-32737.

Add this new connector to the list of supported ones in the documentation with 
a corresponding sub-page with the details of the sink:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-08 Thread Archit Goyal
Hi Rui,

Thanks for the proposal.

The proposed solution of supporting System out and err to be redirected to LOG 
or discarded and introducing an enum and two options to manage this, seems 
reasonable.

+1

Thanks,
Archit Goyal


From: Piotr Nowojski 
Date: Wednesday, November 8, 2023 at 7:38 AM
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to 
LOG or discarded
Hi Rui,

Thanks for the proposal.

+1 I don't have any major comments :)

One nit. In `SystemOutRedirectToLog` in this code:

   System.arraycopy(buf, count - LINE_SEPARATOR_LENGTH, bytes, 0,
LINE_SEPARATOR_LENGTH);
return Arrays.equals(LINE_SEPARATOR_BYTES, bytes)

Is there a reason why you are suggesting to copy out bytes from `buf` to
`bytes`,
instead of using `Arrays.equals(int[] a, int aFromIndex, int aToIndex,
int[] b, int bFromIndex, int bToIndex)`?

Best,
Piotrek

śr., 8 lis 2023 o 11:53 Rui Fan <1996fan...@gmail.com> napisał(a):

> Hi all!
>
> I would like to start a discussion of FLIP-390: Support System out and err
> to be redirected to LOG or discarded[1].
>
> In various production environments, either cloud native or physical
> machines, the disk space that Flink TaskManager can use is limited.
>
> In general, the flink users shouldn't use the `System.out.println` in
> production,
> however this may happen when the number of Flink jobs and job developers
> is very large. Flink job may use System.out to output a large amount of
> data
> to the taskmanager.out file. This file will not roll, it will always
> increment.
> Eventually the upper limit of what the TM can be used for is reached.
>
> We can support System out and err to be redirected to LOG or discarded,
> the LOG can roll and won't increment forever.
>
> This feature is useful for SREs who maintain Flink environments, they can
> redirect System.out to LOG by default. Although the cause of this problem
> is
> that the user's code is not standardized, for SRE, pushing users to modify
> the code one by one is usually a very time-consuming operation. It's also
> useful for job stability where System.out is accidentally used.
>
> Looking forward to your feedback, thanks~
>
> [1] 
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fx%2F4guZE&data=05%7C01%7Cargoyal%40linkedin.com%7C937821de7bd846e6b97408dbe070beae%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638350547072823674%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=zEv6B0Xiq2SNuU6Fm%2BAXnH%2BRILbm6Q0uDRbN7h6iaPM%3D&reserved=0
>
> Best,
> Rui
>


[jira] [Created] (FLINK-33488) Implement restore tests for Deduplicate node

2023-11-08 Thread Jim Hughes (Jira)
Jim Hughes created FLINK-33488:
--

 Summary: Implement restore tests for Deduplicate node
 Key: FLINK-33488
 URL: https://issues.apache.org/jira/browse/FLINK-33488
 Project: Flink
  Issue Type: Sub-task
Reporter: Jim Hughes






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Fink 1.17 vs 1.18

2023-11-08 Thread Yunfeng Zhou
Hi Amir,

You can find the major differences in Flink 1.18's release note[1],
which has also described the performance improvements in 1.18.

[1] 
https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/

Best,
Yunfeng Zhou

On Wed, Nov 8, 2023 at 11:50 PM Amir Hossein Sharifzadeh
 wrote:
>
> Hello Flink dev team,
>
> This question might have been answered already but I want to ask again
> after the official release 1.18.
>
> What are the major differences between Fink 1.17 and 1.18? Is 1.18 faster
> for streaming processing?
>
> Thank you very much.
>
> Best,
> Amir


Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-08 Thread Feng Jin
Hi, Rui.

Thank you for initiating this proposal.

I have a question regarding redirecting stdout and stderr to LOG:

Will they be written to the taskManager.log file by default or the
taskManager.out file?
If we can make taskmanager.out splittable and rolling, would it be easier
for users to use this feature?

Best,
Feng

On Thu, Nov 9, 2023 at 3:15 AM Archit Goyal 
wrote:

> Hi Rui,
>
> Thanks for the proposal.
>
> The proposed solution of supporting System out and err to be redirected to
> LOG or discarded and introducing an enum and two options to manage this,
> seems reasonable.
>
> +1
>
> Thanks,
> Archit Goyal
>
>
> From: Piotr Nowojski 
> Date: Wednesday, November 8, 2023 at 7:38 AM
> To: dev@flink.apache.org 
> Subject: Re: [DISCUSS] FLIP-390: Support System out and err to be
> redirected to LOG or discarded
> Hi Rui,
>
> Thanks for the proposal.
>
> +1 I don't have any major comments :)
>
> One nit. In `SystemOutRedirectToLog` in this code:
>
>System.arraycopy(buf, count - LINE_SEPARATOR_LENGTH, bytes, 0,
> LINE_SEPARATOR_LENGTH);
> return Arrays.equals(LINE_SEPARATOR_BYTES, bytes)
>
> Is there a reason why you are suggesting to copy out bytes from `buf` to
> `bytes`,
> instead of using `Arrays.equals(int[] a, int aFromIndex, int aToIndex,
> int[] b, int bFromIndex, int bToIndex)`?
>
> Best,
> Piotrek
>
> śr., 8 lis 2023 o 11:53 Rui Fan <1996fan...@gmail.com> napisał(a):
>
> > Hi all!
> >
> > I would like to start a discussion of FLIP-390: Support System out and
> err
> > to be redirected to LOG or discarded[1].
> >
> > In various production environments, either cloud native or physical
> > machines, the disk space that Flink TaskManager can use is limited.
> >
> > In general, the flink users shouldn't use the `System.out.println` in
> > production,
> > however this may happen when the number of Flink jobs and job developers
> > is very large. Flink job may use System.out to output a large amount of
> > data
> > to the taskmanager.out file. This file will not roll, it will always
> > increment.
> > Eventually the upper limit of what the TM can be used for is reached.
> >
> > We can support System out and err to be redirected to LOG or discarded,
> > the LOG can roll and won't increment forever.
> >
> > This feature is useful for SREs who maintain Flink environments, they can
> > redirect System.out to LOG by default. Although the cause of this problem
> > is
> > that the user's code is not standardized, for SRE, pushing users to
> modify
> > the code one by one is usually a very time-consuming operation. It's also
> > useful for job stability where System.out is accidentally used.
> >
> > Looking forward to your feedback, thanks~
> >
> > [1]
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fx%2F4guZE&data=05%7C01%7Cargoyal%40linkedin.com%7C937821de7bd846e6b97408dbe070beae%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638350547072823674%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=zEv6B0Xiq2SNuU6Fm%2BAXnH%2BRILbm6Q0uDRbN7h6iaPM%3D&reserved=0
> 
> >
> > Best,
> > Rui
> >
>


Re: [VOTE] Release flink-connector-gcp-pubsub v3.0.2, release candidate #1

2023-11-08 Thread Leonard Xu
Thanks Danny for the reply.

-1 (binding)

Let’s fix the outdated version in the source code and spin a new rc2.
 
I’d like to open a PR to fix it, and hope everything OK in your Flink Forward 
Trip.

Best,
Leonard



> Thanks for helping to verify the release. The 1.16.0 Flink version in the
> pom is a miss, ideally it should be updated to 1.18.0, additionally this
> should have been updated to 1.17.x previously. I would not consider it a
> hard blocker since the Maven build overrides this variable based on the
> provided -Dflink.version and the 1.17/1.18 binaries are valid. However it
> is non ideal that the default version in the source is outdated. Given that
> we are yet to receive any binding votes I am happy to spin an rc2 however
> am a bit busy this week at Flink Forward. I will consider this vote
> open for now unless you make your -1 binding.
> 
> Thanks,
> Danny
> 
> On Tue, Nov 7, 2023 at 8:02 PM Leonard Xu  wrote:
> 
>> Thanks Danny for driving this.  I'm considering -1, please correct me if I
>> understand wrong.
>> 
 * The sources can be compiled and unit tests pass with flink.version
>> 1.17.1
 and flink.version 1.18.0
 
 * Nexus has two staged artifact ids for 3.0.2-1.17 and 3.0.2-1.18
 - flink-connector-gcp-pubsub (.jar, -javadoc.jar, -sources.jar and .pom)
 - flink-connector-gcp-pubsub-parent (only .pom)
>> 
>> 
>> This release aims to support Flink 1.17 and new released Flink 1.18,but
>> why is the version in pom file [1] still 1.16.0 ?  IIUC, it should be
>> 1.17.0  according the process [2].
>> 
>> Best,
>> Leonard
>> 
>> [1]
>> https://github.com/apache/flink-connector-gcp-pubsub/blob/v3.0.2-rc1/pom.xml#L51
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
>> 
>> 
>>> 2023年11月7日 下午12:03,Samrat Deb  写道:
>>> 
>>> +1(non-binding)
>>> 
>>> - Checked release notes
>>> - Verified checksums and signatures
>>> - Verified no binaries in release
>>> - Build connector from source
>>> 
>>> Bests,
>>> Samrat
>>> 
>>> On Mon, 6 Nov 2023 at 8:20 PM, Ryan Skraba >> 
>>> wrote:
>>> 
 Hello! +1 (non-binding)
 
 One note: the parent pom still has 1.16.0 for the Maven property of
 flink.version for both 1.17 and 1.18 releases.
 
 I've validated the source for the RC1:
 flink-connector-gcp-pubsub-3.0.2-src.tgz at r65060
 * The sha512 checksum is OK.
 * The source file is signed correctly.
 * The signature 0F79F2AFB2351BC29678544591F9C1EC125FD8DB is found in the
 KEYS file, and on https://keyserver.ubuntu.com/
 * The source file is consistent with the GitHub tag v3.0.2-rc1, which
 corresponds to commit 4c6be836e6c0f36ef5711f12d7b935254e7d248d
 - The files explicitly excluded by create_pristine_sources (such as
 .gitignore and the submodule tools/releasing/shared) are not present.
 * Has a LICENSE file and a NOTICE file
 * Does not contain any compiled binaries.
 
 
 
 I did a simple smoke test on an emulated Pub/Sub with the 1.18 version.
 
 All my best, Ryan Skraba
 
>> 
>> 



Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-08 Thread Zakelly Lan
Hi Piotr,

Thanks for your detailed explanation! I could see the challenge of
implementing traces with multiple spans and agree to put it in the future
work. I personally prefer the idea of generating multi span traces for
checkpoints on the JM only.

> I'm not sure if I understand the proposal - I don't know how traces could
> be used for this purpose?
> Traces are perfect for one of events (like checkpointing, recovery, etc),
> not for continuous monitoring
> (like processing records). That's what metrics are. Creating trace and
> span(s) per each record would
> be prohibitively expensive.

My original thought was to show how much time a sampled record is processed
within each operator in stream processing. By saying 'sampled', I mean we
won't generate a trace for every record due to the high cost involved.
Instead, we could only trace ONE record from source when the user requests
it (via REST API or Web UI) or when triggered periodically at a very low
frequency. However after re-thinking my idea, I realized it's hard to
define the whole lifecycle of a record since it is transformed into
different forms among operators. We could discuss this in future after the
basic trace is implemented in Flink.

> Unless you mean in batch/bounded jobs? Then yes, we could create a bounded
> job trace, with spans
> for every stage/task/subtask.

Oh yes, batch jobs could definitely leverage the trace.

Best,
Zakelly


On Wed, Nov 8, 2023 at 9:18 PM Jinzhong Li  wrote:

> Hi Piotr,
>
> Thanks for driving this proposal!   I strongly agree that the existing
> metric APIs are not suitable for monitoring restore/checkpoint behavior!
>
> I think the TM-level recovery/checkpointing traces are necessary in the
> future. In our production environment, we sometimes encounter that job
> recovery time is very long (30min+), due to several subTask heavy disk
> traffic. The TM-level recovery trace is helpful for troubleshooting such
> issues.
>
> Best
> Jinzhong
>
> On Wed, Nov 8, 2023 at 5:09 PM Piotr Nowojski 
> wrote:
>
> > Hi Zakelly,
> >
> > Thanks for the comments. Quick answer for both of your questions would be
> > that it probably should be
> > left as a future work. For more detailed answers please take a look below
> > :)
> >
> > > Does it mean the inclusion and subdivision relationships of spans
> defined
> > > by "parent_id" are not supported? I think it is a very necessary
> feature
> > > for the trace.
> >
> > Yes exactly, that is the current limitation. This could be solved somehow
> > one way or another in the future.
> >
> > Support for reporting multi span traces all at once - for example
> > `CheckpointStatsTracker` running JM,
> > could upon checkpoint completion create in one place the whole structure
> of
> > parent spans, to have for
> > example one span per each subtask. This would be a relatively easy follow
> > up.
> >
> > However, if we would like to create true distributed traces, with spans
> > reported from many different
> > components, potentially both on JM and TM, the problem is a bit deeper.
> The
> > issue in that case is how
> > to actually fill out `parrent_id` and `trace_id`? Passing some context
> > entity as a java object would be
> > unfeasible. That would require too many changes in too many places. I
> think
> > the only realistic way
> > to do it, would be to have a deterministic generator of `parten_id` and
> > `trace_id` values.
> >
> > For example we could create the parent trace/span of the checkpoint on
> JM,
> > and set those ids to
> > something like: `jobId#attemptId#checkpointId`. Each subtask then could
> > re-generate those ids
> > and subtasks' checkpoint span would have an id of
> > `jobId#attemptId#checkpointId#subTaskId`.
> > Note that this is just an example, as most likely distributed spans for
> > checkpointing do not make
> > sense, as we can generate them much easier on the JM anyway.
> >
> > > In addition to checkpoint and recovery, I believe the trace would also
> be
> > > valuable for performance tuning. If Flink can trace and visualize the
> > time
> > > cost of each operator and stage for a sampled record, users would be
> able
> > > to easily determine the end-to-end latency and identify performance
> > issues
> > > for optimization. Looking forward to seeing these in the future.
> >
> > I'm not sure if I understand the proposal - I don't know how traces could
> > be used for this purpose?
> > Traces are perfect for one of events (like checkpointing, recovery, etc),
> > not for continuous monitoring
> > (like processing records). That's what metrics are. Creating trace and
> > span(s) per each record would
> > be prohibitively expensive.
> >
> > Unless you mean in batch/bounded jobs? Then yes, we could create a
> bounded
> > job trace, with spans
> > for every stage/task/subtask.
> >
> > Best,
> > Piotrek
> >
> >
> > śr., 8 lis 2023 o 05:30 Zakelly Lan  napisał(a):
> >
> > > Hi Piotr,
> > >
> > > Happy to see the trace! Thanks for this proposal.
> > >
> > > One mi

Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-08 Thread Hangxiang Yu
Hi, Rui.
Thanks for the proposal. It sounds reasonable.
I have some questions, PTAL:
1. I have a similar concern as Feng. Will we redirect to another log file
not taskManager.log ?
taskManager.log contains lots of important information like init log. It
will be rolled quickly if we redirect out and error here.
2. Since we have redirected to LOG mode, Could we also log the subtask info
? It may help us to debug granularly.

On Thu, Nov 9, 2023 at 9:47 AM Feng Jin  wrote:

> Hi, Rui.
>
> Thank you for initiating this proposal.
>
> I have a question regarding redirecting stdout and stderr to LOG:
>
> Will they be written to the taskManager.log file by default or the
> taskManager.out file?
> If we can make taskmanager.out splittable and rolling, would it be easier
> for users to use this feature?
>
> Best,
> Feng
>
> On Thu, Nov 9, 2023 at 3:15 AM Archit Goyal 
> wrote:
>
> > Hi Rui,
> >
> > Thanks for the proposal.
> >
> > The proposed solution of supporting System out and err to be redirected
> to
> > LOG or discarded and introducing an enum and two options to manage this,
> > seems reasonable.
> >
> > +1
> >
> > Thanks,
> > Archit Goyal
> >
> >
> > From: Piotr Nowojski 
> > Date: Wednesday, November 8, 2023 at 7:38 AM
> > To: dev@flink.apache.org 
> > Subject: Re: [DISCUSS] FLIP-390: Support System out and err to be
> > redirected to LOG or discarded
> > Hi Rui,
> >
> > Thanks for the proposal.
> >
> > +1 I don't have any major comments :)
> >
> > One nit. In `SystemOutRedirectToLog` in this code:
> >
> >System.arraycopy(buf, count - LINE_SEPARATOR_LENGTH, bytes, 0,
> > LINE_SEPARATOR_LENGTH);
> > return Arrays.equals(LINE_SEPARATOR_BYTES, bytes)
> >
> > Is there a reason why you are suggesting to copy out bytes from `buf` to
> > `bytes`,
> > instead of using `Arrays.equals(int[] a, int aFromIndex, int aToIndex,
> > int[] b, int bFromIndex, int bToIndex)`?
> >
> > Best,
> > Piotrek
> >
> > śr., 8 lis 2023 o 11:53 Rui Fan <1996fan...@gmail.com> napisał(a):
> >
> > > Hi all!
> > >
> > > I would like to start a discussion of FLIP-390: Support System out and
> > err
> > > to be redirected to LOG or discarded[1].
> > >
> > > In various production environments, either cloud native or physical
> > > machines, the disk space that Flink TaskManager can use is limited.
> > >
> > > In general, the flink users shouldn't use the `System.out.println` in
> > > production,
> > > however this may happen when the number of Flink jobs and job
> developers
> > > is very large. Flink job may use System.out to output a large amount of
> > > data
> > > to the taskmanager.out file. This file will not roll, it will always
> > > increment.
> > > Eventually the upper limit of what the TM can be used for is reached.
> > >
> > > We can support System out and err to be redirected to LOG or discarded,
> > > the LOG can roll and won't increment forever.
> > >
> > > This feature is useful for SREs who maintain Flink environments, they
> can
> > > redirect System.out to LOG by default. Although the cause of this
> problem
> > > is
> > > that the user's code is not standardized, for SRE, pushing users to
> > modify
> > > the code one by one is usually a very time-consuming operation. It's
> also
> > > useful for job stability where System.out is accidentally used.
> > >
> > > Looking forward to your feedback, thanks~
> > >
> > > [1]
> >
> https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fx%2F4guZE&data=05%7C01%7Cargoyal%40linkedin.com%7C937821de7bd846e6b97408dbe070beae%7C72f988bf86f141af91ab2d7cd011db47%7C0%7C0%7C638350547072823674%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=zEv6B0Xiq2SNuU6Fm%2BAXnH%2BRILbm6Q0uDRbN7h6iaPM%3D&reserved=0
> > 
> > >
> > > Best,
> > > Rui
> > >
> >
>


-- 
Best,
Hangxiang.


Re: [DISCUSS] Release 1.17.2

2023-11-08 Thread Yun Tang
Hi All,

Thank you for your feedback!

As there are no other concerns or objections, and currently I am not aware of 
any unresolved blockers.

I will kick off the release process and start preparing for the RC1 version 
from today.


Best
Yun Tang

From: Leonard Xu 
Sent: Wednesday, November 8, 2023 11:10
To: dev@flink.apache.org 
Cc: rui fan <1996fan...@gmail.com>; yuchen.e...@gmail.com 

Subject: Re: [DISCUSS] Release 1.17.2

Thanks Yun for picking this.

+1

Best,
Leonard


> 2023年11月8日 上午9:23,Jingsong Li  写道:
>
> +1 thanks Yun
>
> 1.17.2 is really important.
>
> Best,
> Jingsong
>
> On Tue, Nov 7, 2023 at 9:32 AM Danny Cranmer  wrote:
>>
>> +1, thanks for picking this up.
>>
>> I am happy to help out with the bits you need PMC support for.
>>
>> Thanks,
>> Danny
>>
>> On Tue, Nov 7, 2023 at 4:03 AM Yun Tang  wrote:
>>
>>> Hi @casel.chen
>>>
>>> It seems FLINK-33365 is more related to JDBC connector than the Flink
>>> runtime, and the discussion will focus on the release of Flink-1.17.2.
>>>
>>>
>>> Best
>>> Yun Tang
>>> 
>>> From: casel.chen 
>>> Sent: Tuesday, November 7, 2023 16:04
>>> To: dev@flink.apache.org 
>>> Cc: rui fan <1996fan...@gmail.com>; yuchen.e...@gmail.com <
>>> yuchen.e...@gmail.com>
>>> Subject: Re:Re: [DISCUSS] Release 1.17.2
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> https://issues.apache.org/jira/browse/FLINK-33365 fixed or not in release
>>> 1.17.2?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> At 2023-11-07 09:47:29, "liu ron"  wrote:
 +1

 Best,
 Ron

 Jing Ge  于2023年11月6日周一 22:07写道:

> +1
> Thanks for your effort!
>
> Best regards,
> Jing
>
> On Mon, Nov 6, 2023 at 1:15 AM Konstantin Knauf 
>>> wrote:
>
>> Thank you for picking it up! +1
>>
>> Cheers,
>>
>> Konstantin
>>
>> Am Mo., 6. Nov. 2023 um 03:48 Uhr schrieb Yun Tang >>> :
>>
>>> Hi all,
>>>
>>> I would like to discuss creating a new 1.17 patch release (1.17.2).
>>> The
>>> last 1.17 release is near half a year old, and since then, 79
>>> tickets
>> have
>>> been closed [1], of which 15 are blocker/critical [2]. Some
>>> of them are quite important, such as FLINK-32758 [3], FLINK-32296
>>> [4],
>>> FLINK-32548 [5]
>>> and FLINK-33010[6].
>>>
>>> In addition to this, FLINK-33149 [7] is important to bump
>>> snappy-java
> to
>>> 1.1.10.4.
>>> Although FLINK-33149 is unresolved, it was done in 1.17.2.
>>>
>>> I am not aware of any unresolved blockers and there are no
>>> in-progress
>>> tickets [8]. Please let me know if there are any issues you'd like
>>> to
> be
>>> included in this release but still not merged.
>>>
>>> If the community agrees to create this new patch release, I could
>>> volunteer as the release manager with Yu Chen.
>>>
>>> Since there will be another flink-1.16.3 release request during the
> same
>>> time, we will work with Rui Fan since many issues will be fixed in
>>> both
>>> releases.
>>>
>>> [1]
>>>
>>
>
>>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.17.2%20%20and%20resolution%20%20!%3D%20%20Unresolved%20order%20by%20priority%20DESC
>>> [2]
>>>
>>
>
>>> https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.17.2%20and%20resolution%20%20!%3D%20Unresolved%20%20and%20priority%20in%20(Blocker%2C%20Critical)%20ORDER%20by%20priority%20%20DESC
>>> [3] https://issues.apache.org/jira/browse/FLINK-32758
>>> [4] https://issues.apache.org/jira/browse/FLINK-32296
>>> [5] https://issues.apache.org/jira/browse/FLINK-32548
>>> [6] https://issues.apache.org/jira/browse/FLINK-33010
>>> [7] https://issues.apache.org/jira/browse/FLINK-33149
>>> [8] https://issues.apache.org/jira/projects/FLINK/versions/12353260
>>>
>>> Best
>>> Yun Tang
>>>
>>
>>
>> --
>> https://twitter.com/snntrable
>> https://github.com/knaufk
>>
>
>>>



Re: [DISCUSS] Release Flink 1.16.3

2023-11-08 Thread Rui Fan
Hi All,

Thank you for your feedback!

As there are no other concerns or objections, and currently
I am not aware of any unresolved blockers.

I will kick off the release process and start preparing for the
RC1 version from today.

Best,
Rui

On Wed, Nov 8, 2023 at 4:23 PM ConradJam  wrote:

> +1
>
> Sergey Nuyanzin  于2023年11月8日周三 13:08写道:
>
> > +1 for the final release
> > and thanks for the efforts
> >
> > On Wed, Nov 8, 2023 at 4:09 AM Leonard Xu  wrote:
> >
> > > Thanks Rui for driving this.
> > >
> > > +1 to release 1.16.3 and make it as the  final bugix release of 1.16
> > > series.
> > >
> > > Best,
> > > Leonard
> > >
> >
> >
> > --
> > Best regards,
> > Sergey
> >
>
>
> --
> Best
>
> ConradJam
>


[jira] [Created] (FLINK-33489) LISTAGG with generating partial-final agg will case wrong result

2023-11-08 Thread xuyang (Jira)
xuyang created FLINK-33489:
--

 Summary: LISTAGG with generating partial-final agg will case wrong 
result
 Key: FLINK-33489
 URL: https://issues.apache.org/jira/browse/FLINK-33489
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.0, 1.17.0, 1.16.0, 1.15.0, 1.14.0, 1.13.0, 1.12.0, 
1.11.0, 1.10.0, 1.9.0
Reporter: xuyang


Adding the following test cases in SplitAggregateITCase will reproduce this bug:

 
{code:java}
// code placeholder
@Test
def testListAggWithDistinctMultiArgs(): Unit = {
  val t1 = tEnv.sqlQuery(s"""
|SELECT
|  a,
|  LISTAGG(DISTINCT c, '#')
|FROM T
|GROUP BY a
 """.stripMargin)

  val sink = new TestingRetractSink
  t1.toRetractStream[Row].addSink(sink)
  env.execute()

  val expected = Map[String, List[String]](
"1" -> List("Hello 0", "Hello 1"),
"2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
"3" -> List("Hello 0", "Hello 1"),
"4" -> List("Hello 1", "Hello 2", "Hello 3")
  )
  val actualData = sink.getRetractResults.sorted
  println(actualData)
} {code}
The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
`#` will be ignored.

Let's take its plan:
{code:java}
// code placeholder
LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
+- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
LISTAGG_RETRACT($f3_0) AS $f1])
   +- Exchange(distribution=[hash[a]])
      +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
         +- Exchange(distribution=[hash[a, $f3, $f4]])
            +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 1024) 
AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
               +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
                  +- DataStreamScan(table=[[default_catalog, default_database, 
T]], fields=[a, b, c]) {code}
The final `GroupAggregate` missing the delimiter args, and the default 
delimiter `,` will be used.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-08 Thread Hangxiang Yu
Hi, Piotr.
Thanks for the proposal.
Just as we discussed in FLINK-23411, +1 for supporting trace/span to
monitor metrics like checkpoint and recovery.

We could also do many things based on this mechanism:
1. more fine-grained metrics about checkpoint and recovery. For example,
some stage info about unaligned checkpoint or generic increment checkpoint
(changelog) which is a bit difficult to report in the current metric system.
2. users could also report their own-defined operator metrics to their own
distributed tracing system which may be traced together with other jobs or
systems.

Look forward to this feature!


On Thu, Nov 9, 2023 at 11:40 AM Zakelly Lan  wrote:

> Hi Piotr,
>
> Thanks for your detailed explanation! I could see the challenge of
> implementing traces with multiple spans and agree to put it in the future
> work. I personally prefer the idea of generating multi span traces for
> checkpoints on the JM only.
>
> > I'm not sure if I understand the proposal - I don't know how traces could
> > be used for this purpose?
> > Traces are perfect for one of events (like checkpointing, recovery, etc),
> > not for continuous monitoring
> > (like processing records). That's what metrics are. Creating trace and
> > span(s) per each record would
> > be prohibitively expensive.
>
> My original thought was to show how much time a sampled record is processed
> within each operator in stream processing. By saying 'sampled', I mean we
> won't generate a trace for every record due to the high cost involved.
> Instead, we could only trace ONE record from source when the user requests
> it (via REST API or Web UI) or when triggered periodically at a very low
> frequency. However after re-thinking my idea, I realized it's hard to
> define the whole lifecycle of a record since it is transformed into
> different forms among operators. We could discuss this in future after the
> basic trace is implemented in Flink.
>
> > Unless you mean in batch/bounded jobs? Then yes, we could create a
> bounded
> > job trace, with spans
> > for every stage/task/subtask.
>
> Oh yes, batch jobs could definitely leverage the trace.
>
> Best,
> Zakelly
>
>
> On Wed, Nov 8, 2023 at 9:18 PM Jinzhong Li 
> wrote:
>
> > Hi Piotr,
> >
> > Thanks for driving this proposal!   I strongly agree that the existing
> > metric APIs are not suitable for monitoring restore/checkpoint behavior!
> >
> > I think the TM-level recovery/checkpointing traces are necessary in the
> > future. In our production environment, we sometimes encounter that job
> > recovery time is very long (30min+), due to several subTask heavy disk
> > traffic. The TM-level recovery trace is helpful for troubleshooting such
> > issues.
> >
> > Best
> > Jinzhong
> >
> > On Wed, Nov 8, 2023 at 5:09 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for the comments. Quick answer for both of your questions would
> be
> > > that it probably should be
> > > left as a future work. For more detailed answers please take a look
> below
> > > :)
> > >
> > > > Does it mean the inclusion and subdivision relationships of spans
> > defined
> > > > by "parent_id" are not supported? I think it is a very necessary
> > feature
> > > > for the trace.
> > >
> > > Yes exactly, that is the current limitation. This could be solved
> somehow
> > > one way or another in the future.
> > >
> > > Support for reporting multi span traces all at once - for example
> > > `CheckpointStatsTracker` running JM,
> > > could upon checkpoint completion create in one place the whole
> structure
> > of
> > > parent spans, to have for
> > > example one span per each subtask. This would be a relatively easy
> follow
> > > up.
> > >
> > > However, if we would like to create true distributed traces, with spans
> > > reported from many different
> > > components, potentially both on JM and TM, the problem is a bit deeper.
> > The
> > > issue in that case is how
> > > to actually fill out `parrent_id` and `trace_id`? Passing some context
> > > entity as a java object would be
> > > unfeasible. That would require too many changes in too many places. I
> > think
> > > the only realistic way
> > > to do it, would be to have a deterministic generator of `parten_id` and
> > > `trace_id` values.
> > >
> > > For example we could create the parent trace/span of the checkpoint on
> > JM,
> > > and set those ids to
> > > something like: `jobId#attemptId#checkpointId`. Each subtask then could
> > > re-generate those ids
> > > and subtasks' checkpoint span would have an id of
> > > `jobId#attemptId#checkpointId#subTaskId`.
> > > Note that this is just an example, as most likely distributed spans for
> > > checkpointing do not make
> > > sense, as we can generate them much easier on the JM anyway.
> > >
> > > > In addition to checkpoint and recovery, I believe the trace would
> also
> > be
> > > > valuable for performance tuning. If Flink can trace and visualize the
> > > time
> > > > cost of each operator and stage 

Re: [DISCUSS] FLIP-384: Introduce TraceReporter and use it to create checkpointing and recovery traces

2023-11-08 Thread Rui Fan
Hi Piotr:

Thanks for your reply!

> About structured logging (basically events?) I vaguely remember some
> discussions about that. It might be a much larger topic, so I would
prefer
> to leave it out of the scope of this FLIP.

Sounds make sense to me!

> I think those could be indeed useful. If you would like to contribute to
them
> in the future, I would be happy to review the FLIP for it :)

Thank you, after this FLIP, I or my colleagues can pick it up!

Best,
Rui

On Thu, Nov 9, 2023 at 11:39 AM Zakelly Lan  wrote:

> Hi Piotr,
>
> Thanks for your detailed explanation! I could see the challenge of
> implementing traces with multiple spans and agree to put it in the future
> work. I personally prefer the idea of generating multi span traces for
> checkpoints on the JM only.
>
> > I'm not sure if I understand the proposal - I don't know how traces could
> > be used for this purpose?
> > Traces are perfect for one of events (like checkpointing, recovery, etc),
> > not for continuous monitoring
> > (like processing records). That's what metrics are. Creating trace and
> > span(s) per each record would
> > be prohibitively expensive.
>
> My original thought was to show how much time a sampled record is processed
> within each operator in stream processing. By saying 'sampled', I mean we
> won't generate a trace for every record due to the high cost involved.
> Instead, we could only trace ONE record from source when the user requests
> it (via REST API or Web UI) or when triggered periodically at a very low
> frequency. However after re-thinking my idea, I realized it's hard to
> define the whole lifecycle of a record since it is transformed into
> different forms among operators. We could discuss this in future after the
> basic trace is implemented in Flink.
>
> > Unless you mean in batch/bounded jobs? Then yes, we could create a
> bounded
> > job trace, with spans
> > for every stage/task/subtask.
>
> Oh yes, batch jobs could definitely leverage the trace.
>
> Best,
> Zakelly
>
>
> On Wed, Nov 8, 2023 at 9:18 PM Jinzhong Li 
> wrote:
>
> > Hi Piotr,
> >
> > Thanks for driving this proposal!   I strongly agree that the existing
> > metric APIs are not suitable for monitoring restore/checkpoint behavior!
> >
> > I think the TM-level recovery/checkpointing traces are necessary in the
> > future. In our production environment, we sometimes encounter that job
> > recovery time is very long (30min+), due to several subTask heavy disk
> > traffic. The TM-level recovery trace is helpful for troubleshooting such
> > issues.
> >
> > Best
> > Jinzhong
> >
> > On Wed, Nov 8, 2023 at 5:09 PM Piotr Nowojski 
> > wrote:
> >
> > > Hi Zakelly,
> > >
> > > Thanks for the comments. Quick answer for both of your questions would
> be
> > > that it probably should be
> > > left as a future work. For more detailed answers please take a look
> below
> > > :)
> > >
> > > > Does it mean the inclusion and subdivision relationships of spans
> > defined
> > > > by "parent_id" are not supported? I think it is a very necessary
> > feature
> > > > for the trace.
> > >
> > > Yes exactly, that is the current limitation. This could be solved
> somehow
> > > one way or another in the future.
> > >
> > > Support for reporting multi span traces all at once - for example
> > > `CheckpointStatsTracker` running JM,
> > > could upon checkpoint completion create in one place the whole
> structure
> > of
> > > parent spans, to have for
> > > example one span per each subtask. This would be a relatively easy
> follow
> > > up.
> > >
> > > However, if we would like to create true distributed traces, with spans
> > > reported from many different
> > > components, potentially both on JM and TM, the problem is a bit deeper.
> > The
> > > issue in that case is how
> > > to actually fill out `parrent_id` and `trace_id`? Passing some context
> > > entity as a java object would be
> > > unfeasible. That would require too many changes in too many places. I
> > think
> > > the only realistic way
> > > to do it, would be to have a deterministic generator of `parten_id` and
> > > `trace_id` values.
> > >
> > > For example we could create the parent trace/span of the checkpoint on
> > JM,
> > > and set those ids to
> > > something like: `jobId#attemptId#checkpointId`. Each subtask then could
> > > re-generate those ids
> > > and subtasks' checkpoint span would have an id of
> > > `jobId#attemptId#checkpointId#subTaskId`.
> > > Note that this is just an example, as most likely distributed spans for
> > > checkpointing do not make
> > > sense, as we can generate them much easier on the JM anyway.
> > >
> > > > In addition to checkpoint and recovery, I believe the trace would
> also
> > be
> > > > valuable for performance tuning. If Flink can trace and visualize the
> > > time
> > > > cost of each operator and stage for a sampled record, users would be
> > able
> > > > to easily determine the end-to-end latency and identify performance
> > > issues
> > > > for op

[DISCUSS] Release flink-connector-pulsar 4.1.0

2023-11-08 Thread tison
Hi,

I'd propose to cut a new release for flink-connector-pulsar 4.1.0[1].

>From the last release (4.0.0), we mainly achieved:

1. Implement table connector (integrated with Flink SQL)
2. Drop the requirement for using adminURL
3. Support JDK 11

I can help in driving the release but perhaps we need some more PMC
members' attention and help.

What do you think?

Best,
tison.

[1] https://github.com/apache/flink-connector-pulsar


Re: [DISCUSS] FLIP-390: Support System out and err to be redirected to LOG or discarded

2023-11-08 Thread Rui Fan
Hi Piotr, Archit, Feng and Hangxiang:

Thanks a lot for your feedback!

Following is my comment, please correct me if I misunderstood anything!

To Piotr:

> Is there a reason why you are suggesting to copy out bytes from `buf` to
`bytes`,
> instead of using `Arrays.equals(int[] a, int aFromIndex, int aToIndex,
int[] b, int bFromIndex, int bToIndex)`?

I see java8 doesn't have `Arrays.equals(int[] a, int aFromIndex, int
aToIndex, int[] b, int bFromIndex, int bToIndex)`,
and java11 has it. Do you have any other suggestions for java8?

Also, this code doesn't run in production. As the comment of
System.lineSeparator():

> On UNIX systems, it returns {@code "\n"}; on Microsoft
> Windows systems it returns {@code "\r\n"}.

So Mac and Linux just return one character, we will compare
one byte directly.



To Feng:

> Will they be written to the taskManager.log file by default
> or the taskManager.out file?

I prefer LOG as the default value for taskmanager.system-out.mode.
It's useful for job stability and doesn't introduce significant impact to
users. Also, our production has already used this feature for
more than 1 years, it works well.

However, I write the DEFAULT as the default value for
taskmanager.system-out.mode, because when the community introduces
new options, the default value often selects the original behavior.

Looking forward to hear more thoughts from community about this
default value.

> If we can make taskmanager.out splittable and rolling, would it be
> easier for users to use this feature?

Making taskmanager.out splittable and rolling is a good choice!
I have some concerns about it:

1. Users may also want to use LOG.info in their code and just
  accidentally use System.out.println. It is possible that they will
  also find the logs directly in taskmanager.log.
2. I'm not sure whether the rolling strategy is easy to implement.
  If we do it, it's necessary to define a series of flink options similar
  to log options, such as: fileMax(how many files should be retained),
  fileSize(The max size each file), fileNamePatten (The suffix of file
name),
3. Check the file size periodically: all logs are written by log plugin,
  they can check the log file size after writing. However, System.out
  are written directly. And flink must start a thread to check the latest
  taskmanager.out size periodically. If it's too quick, most of job aren't
  necessary. If it's too slow, the file size cannot be controlled properly.

Redirect it to LOG.info may be a reasonable and easy choice.
The user didn't really want to log into taskmanager.out, it just
happened by accident.



To Hangxiang:

> 1. I have a similar concern as Feng. Will we redirect to another log file
> not taskManager.log ?

Please see my last comment, thanks!

> taskManager.log contains lots of important information like init log. It
> will be rolled quickly if we redirect out and error here.

IIUC, this issue isn't caused by System.out, and it can happen if user
call a lot of LOG.info. As I mentioned before: the user didn't really want
to log into taskmanager.out, it just happened by accident.
So, if users change the System.out to LOG.info, it still happen.

> 2. Since we have redirected to LOG mode, Could we also log the subtask
info
> ? It may help us to debug granularly.

I'm not sure what `log the subtask info` means. Let me confirm with you
first.
Do you mean like this: LOG.info("taskName {} : {}", taskName,
userLogContext)?

Best,
Rui

On Thu, Nov 9, 2023 at 11:47 AM Hangxiang Yu  wrote:

> Hi, Rui.
> Thanks for the proposal. It sounds reasonable.
> I have some questions, PTAL:
> 1. I have a similar concern as Feng. Will we redirect to another log file
> not taskManager.log ?
> taskManager.log contains lots of important information like init log. It
> will be rolled quickly if we redirect out and error here.
> 2. Since we have redirected to LOG mode, Could we also log the subtask info
> ? It may help us to debug granularly.
>
> On Thu, Nov 9, 2023 at 9:47 AM Feng Jin  wrote:
>
> > Hi, Rui.
> >
> > Thank you for initiating this proposal.
> >
> > I have a question regarding redirecting stdout and stderr to LOG:
> >
> > Will they be written to the taskManager.log file by default or the
> > taskManager.out file?
> > If we can make taskmanager.out splittable and rolling, would it be easier
> > for users to use this feature?
> >
> > Best,
> > Feng
> >
> > On Thu, Nov 9, 2023 at 3:15 AM Archit Goyal  >
> > wrote:
> >
> > > Hi Rui,
> > >
> > > Thanks for the proposal.
> > >
> > > The proposed solution of supporting System out and err to be redirected
> > to
> > > LOG or discarded and introducing an enum and two options to manage
> this,
> > > seems reasonable.
> > >
> > > +1
> > >
> > > Thanks,
> > > Archit Goyal
> > >
> > >
> > > From: Piotr Nowojski 
> > > Date: Wednesday, November 8, 2023 at 7:38 AM
> > > To: dev@flink.apache.org 
> > > Subject: Re: [DISCUSS] FLIP-390: Support System out and err to be
> > > redirected to LOG or discarded
> >

Re: [DISCUSS] Release flink-connector-pulsar 4.1.0

2023-11-08 Thread Leonard Xu
Hey, Tison.

+1 to release  flink-connector-pulsar 4.1.0.

I’m glad to offer help for the release.


Best,
Leonard



> 2023年11月9日 下午1:30,tison  写道:
> 
> Hi,
> 
> I'd propose to cut a new release for flink-connector-pulsar 4.1.0[1].
> 
> From the last release (4.0.0), we mainly achieved:
> 
> 1. Implement table connector (integrated with Flink SQL)
> 2. Drop the requirement for using adminURL
> 3. Support JDK 11
> 
> I can help in driving the release but perhaps we need some more PMC
> members' attention and help.
> 
> What do you think?
> 
> Best,
> tison.
> 
> [1] https://github.com/apache/flink-connector-pulsar



[jira] [Created] (FLINK-33490) Validate the name conflicts when creating view

2023-11-08 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-33490:
-

 Summary: Validate the name conflicts when creating view
 Key: FLINK-33490
 URL: https://issues.apache.org/jira/browse/FLINK-33490
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: Shengkai Fang


When should forbid 

```
CREATE VIEW id_view AS
SELECT id, uid AS id FROM id_table
```

As the SQL standards states,

If  is specified, then:
i) If any two columns in the table specified by the  have 
equivalent s, or if any column of that table has an 
implementation-dependent name, then a  shall be specified.
ii) Equivalent s shall not be specified more than once in the 
.

Many databases also throw exception when view name conflicts, e.g. mysql, 
postgres.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)