[jira] [Created] (FLINK-22897) FlinkSQL1.12 Sink to Hive with diffrent parallelism will due to produce many small files

2021-06-07 Thread zhengjiewen (Jira)
zhengjiewen created FLINK-22897:
---

 Summary: FlinkSQL1.12 Sink to Hive with diffrent parallelism will 
due to produce many small files
 Key: FLINK-22897
 URL: https://issues.apache.org/jira/browse/FLINK-22897
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.12.1
Reporter: zhengjiewen


I try to use flink sql in batch mode, to sink data into hive partition table, 
here is the sql:

 
{code:java}
//代码占位符
{code}
INSERT OVERWRTITE 【targetTable】SELECT 【field】FROM 【sourceTable】;

 

 

And I found that when the parallelism of the sink operator is different from 
that of the operator before it, a large number of small files will be 
generated. But this is not the case when the parallelism is the same.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22898) HiveParallelismInference limit return wrong parallelism

2021-06-07 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22898:


 Summary: HiveParallelismInference limit return wrong parallelism
 Key: FLINK-22898
 URL: https://issues.apache.org/jira/browse/FLINK-22898
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Ecosystem
Reporter: Jingsong Lee
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22899) ValuesUpsertSinkFunction needs to use global upsert

2021-06-07 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22899:


 Summary: ValuesUpsertSinkFunction needs to use global upsert
 Key: FLINK-22899
 URL: https://issues.apache.org/jira/browse/FLINK-22899
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.14.0


At present, each task does its own upsert. We need to simulate the external 
connector and use the global map to do the upsert.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Support specifying custom transactional.id prefix in FlinkKafkaProducer

2021-06-07 Thread Till Rohrmann
Thanks for starting this discussion Wenhao. I've given you permission to
create a FLIP.

Cheers,
Till

On Sat, Jun 5, 2021 at 9:48 AM Wenhao Ji  wrote:

> Hi everyone,
>
> Currently, the "transactional.id"s of the Kafka producers in
> FlinkKafkaProducer are generated based on the task name. This mechanism has
> some limitations:
>
>  - It will exceed Kafka's limitation if the task name is too long.
> (resolved in FLINK-17691)
>  - They will very likely clash with each other if the job topologies are
> similar. (discussed in FLINK-11654)
>  - Only certain "transactional.id" may be authorized by Prefixed ACLs on
> the target Kafka cluster.
>
> Besides, I have also seen that a lot of other open-source Kafka connectors
> have already supported specifying a custom prefix during creation. For
> instance, the spring community has introduced the `setTransactionIdPrefix`
> method to their Kafka client.
>
> So I propose this improvement and hope it could be developed and released
> recently.
>
> This is actually a follow-up discussion of FLINK-11654
> . And I have also
> raised
> FLINK-22452  to track
> this issue.
>
> As discussed, here are the possible solutions,
> - either introduce an additional method called
> `setTransactionalIdPrefix(String)` in the FlinkKafkaProducer, (which i
> prefer)
> - or use the existing "transactional.id" properties as the prefix.
>
> And the behavior of the "transactional.id" generation will be
>  - keep the behavior as it was if absent,
>  - use the one if present as the prefix for the TransactionalIdsGenerator.
>
> As Jiangjie Qin suggested in FLINK-11654, we still need a FLIP for this.
> I would love to work on this and create the FLIP. Can somebody help me
> (Username: zetaplusae
> ) grant the
> permissions on Confluence and also assign the ticket to me?
>
> Thanks,
>
> Wenhao
>


[jira] [Created] (FLINK-22900) flink 1.11.2 fileSystem source table read fileSystem sink table path multi-partition error

2021-06-07 Thread bigdataf (Jira)
bigdataf created FLINK-22900:


 Summary:  flink 1.11.2 fileSystem source table read fileSystem 
sink table path multi-partition error
 Key: FLINK-22900
 URL: https://issues.apache.org/jira/browse/FLINK-22900
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.2
 Environment: 1.The error code is in FileSystemTableSource
{code:java}
public List> getPartitions() {
try {
return (List) 
PartitionPathUtils.searchPartSpecAndPaths(this.path.getFileSystem(), this.path, 
this.partitionKeys.size()).stream().map((tuple2) -> {
return (LinkedHashMap) tuple2.f0;
}).map((spec) -> {
LinkedHashMap ret = new LinkedHashMap();
spec.forEach((k, v) ->

{
String var1 = (String) ret.put(k, 
this.defaultPartName.equals(v) ? null : v);
});
return ret;
}).collect(Collectors.toList());
} catch (Exception var2) {
throw new TableException("Fetch partitions fail.", var2);
}
}
 
{code}
 
 2.searchPartSpecAndPaths
 
{code:java}
public static List, Path>> 
searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) {
FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, 
fs);
//eg: generatedParts 
// hdfs://xxx-hdfs/merge/all/.staging_1622167234684/cp-0
// hdfs://xxx-hdfs/merge/all/dayno=20210531/hour=11

List, Path>> ret = new ArrayList();
FileStatus[] var5 = generatedParts;
int var6 = generatedParts.length;
for (int var7 = 0; var7 < var6; ++var7) {
FileStatus part = var5[var7];
if (!isHiddenFile(part)) {
ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), 
part.getPath()));
}
}
return ret;
}
{code}
 

3.isHiddenFile reads staging_1622167234684/cp-0 and then an error is 
reported,so I suggest to judge the number of partitions at the same time to 
ensure the availability of the directory
public static List, Path>> 
searchPartSpecAndPaths(FileSystem fs, Path path, int partitionNumber) 
\{//根据分去字段个数递归获得分区目录
FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, 
fs);
List, Path>> ret = new ArrayList();
for (FileStatus part : generatedParts) {
if (isHiddenFile(part)) {
continue;
}
LinkedHashMap,Path > fullPartSpec = 
extractPartitionSpecFromPath(part.getPath());
if (fullPartSpec.size == partitionNumber) \{
ret.add(new Tuple2(extractPartitionSpecFromPath(part.getPath()), 
part.getPath()));
}
}
return ret;
}
Reporter: bigdataf


eg:
 Create create table source_test(id string,name string dayno sring,`hour` 
string) partitioned (dayno ,`hour`) 
with('connector'='filesystm',path='x/data/') based on flink filesystem 
connect
 1.Stack error
{code:java}
// 
ava.lang.reflect.InvocationTargetException at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497) at 
com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66) 
Caused by: java.util.NoSuchElementException: key not found: hour at 
scala.collection.MapLike$class.default(MapLike.scala:228) at 
scala.collection.AbstractMap.default(Map.scala:59) at 
scala.collection.MapLike$class.apply(MapLike.scala:141) at 
scala.collection.AbstractMap.apply(Map.scala:59) at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
 at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
 at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
 at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
 at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
 at scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(Partit

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-07 Thread Arvid Heise
Hi Eron,

you either have very specific use cases in mind or have a misconception
about idleness in Flink with the new sources. The basic idea is that you
have watermark generators only at the sources and the user supplies them.
As a source author, you have no option to limit that. Here a bit of
background:

We observed that many users that read from Kafka were confused about no
visible progress in their Flink applications because of some idle partition
and we introduced idleness subsequently. Idleness was always considered as
a means to achieve progress at the risk of losing a bit of correctness.
So especially in the case that you describe with a Pulsar partition that is
empty but indefinitely active, the user needs to be able to use idleness
such that downstream window operators progress.

I hope to have clarified that "I wouldn't recommend using withIdleness()
with source-based watermarks." would pretty much make the intended use case
not work anymore.

---

Nevertheless, from the discussion with you and some offline discussion with
Piotr and Dawid, we actually found quite a bit of drawbacks from the
current definition of idleness:
- We currently only use idleness to exclude respective upstream tasks from
participating in watermark generation (as you have eloquently put further
up in the thread).
- However, the definition is bound to records. So while a partition is
idle, no records should be produced.
- That brings us into quite a few edge cases, where operators emit records,
while they are actually idling: Think of timers, asyncIO operators, window
operators based on timeouts, etc.
- The solution would be to turn the operator active while emitting and
returning to being idle afterwards (but when?). However, this has some
unintended side-effects depending on when you switch back.

We are currently thinking that we should rephrase the definition to what
you described:
- A channel that is active is providing watermarks.
- An idle channel is not providing any watermarks but can deliver records.
- Then we are not talking about idle partitions anymore but explicit and
implicit watermark generation and should probably rename the concepts.
- This would probably mean that we also need an explicit markActive in
source/sink to express that the respective entity now needs to wait for
explicit watermarks.

I'll open a proper discussion thread tomorrow.

Note that we probably shouldn't rush this FLIP until we have clarified the
semantics of idleness. We could also cut the scope of the FLIP to exclude
idleness and go ahead without it (there should be enough binding votes
already).

On Sat, Jun 5, 2021 at 12:09 AM Eron Wright 
wrote:

> I understand your scenario but I disagree with its assumptions:
>
> "However, the partition of A is empty and thus A is temporarily idle." -
> you're assuming that the behavior of the source is to mark itself idle if
> data isn't available, but that's clearly source-specific and not behavior
> we expect to have in Pulsar source.  A partition may be empty indefinitely
> while still being active.  Imagine that the producer is defending a lease -
> "I'm here, there's no data, please don't advance the clock".
>
> "we bind idleness to wall clock time" - you're characterizing a specific
> strategy (WatermarkStrategy.withIdleness()), not the inherent behavior of
> the pipeline.  I wouldn't recommend using withIdleness() with source-based
> watermarks.
>
> I do agree that dynamism in partition assignment can wreak havoc on
> watermark correctness.  We have some ideas on the Pulsar side about that
> too.  I would ask that we focus on the Flink framework and pipeline
> behavior.  By offering a more powerful framework, we encourage stream
> storage systems to "rise to the occasion" - treat event time in a
> first-class way, optimize for correctness, etc.  In this case, FLIP-167 is
> setting the stage for evolution in Pulsar.
>
> Thanks again Arvid for the great discussion.
>
>
>
>
>
> On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise  wrote:
>
> > At least one big motivation is having (temporary) empty partitions. Let
> me
> > give you an example, why imho idleness is only approximate in this case:
> > Assume you have source subtask A, B, C that correspond to 3 source
> > partitions and a downstream keyed window operator W.
> >
> > W would usually trigger on min_watermark(A, B, C). However, the partition
> > of A is empty and thus A is temporarily idle. So W triggers on
> > min_watermark(B, C). When A is now active again, the watermark implicitly
> > is min_watermark(B, C) for A!
> >
> > Let's further assume that the source is filled by another pipeline
> before.
> > This pipeline experiences technical difficulties for X minutes and could
> > not produce into the partition of A, hence the idleness. When the
> upstream
> > pipeline resumes it fills A with some records that are before
> > min_watermark(B, C). Any watermark generated from these records is
> > discarded as the watermark is monotonous. Ther

Re: [DISCUSS] Feedback Collection Jira Bot

2021-06-07 Thread Arvid Heise
One more idea for the bot. Could we have a label to exclude certain tickets
from the life-cycle?

I'm thinking about long-term tickets such as improving DataStream to
eventually replace DataSet. We would collect ideas over the next couple of
weeks without any visible progress on the implementation.

On Fri, May 21, 2021 at 2:06 PM Konstantin Knauf  wrote:

> Hi Timo,
>
> Thanks for joining the discussion. All rules except the unassigned rule do
> not apply to Sub-Tasks actually (like deprioritization, closing).
> Additionally, activity on a Sub-Taks counts as activity for the parent. So,
> the parent ticket would not be touched by the bot as long as there is a
> single Sub-Task that has a discussion or an update. If you experience
> something different, this is a bug.
>
> Is there a reason why it is important to assign all Sub-Tasks to the same
> person immediately? I am not sure if this kind "reserving tickets" is a
> good idea in general to be honest.
>
> Cheers,
>
> Konstantin
>
>
>
>
>
> On Fri, May 21, 2021 at 12:00 PM Timo Walther  wrote:
>
> > Hi Konstantin,
> >
> > thanks for starting this discussion. I was also about to provide some
> > feedback because I have the feeling that the bot is too aggressive at
> > the moment.
> >
> > Even a 14 days interval is a short period of time for bigger efforts
> > that might include several subtasks. Currently, if we split an issue
> > into subtasks usually most subtasks are assigned to the same person. But
> > the bot requires us to update all subtasks again after 7 days. Could we
> > disable the bot for subtasks or extend the period to 30 days?
> >
> > The core problem in the past was that we had issues laying around
> > untouched for years. Luckily, this is solved with the bot now. But going
> > from years to 7 days spams the mail box quite a bit.
> >
> > Regards,
> > Timo
> >
> >
> > On 21.05.21 09:22, Konstantin Knauf wrote:
> > > Hi Robert,
> > >
> > > Could you elaborate on your comment on test instabilities? Would test
> > > instabilities always get a fixVersion then?
> > >
> > > Background: Test instabilities are supposed to be Critical. Critical
> > > tickets are deprioritized if they are unassigned and have not received
> an
> > > update for 14 days.
> > >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > >
> > >
> > > On Thu, May 20, 2021 at 9:34 AM Robert Metzger 
> > wrote:
> > >
> > >> +1
> > >> This would also cover test instabilities, which I personally believe
> > should
> > >> not be auto-deprioritized until they've been analyzed.
> > >>
> > >> On Wed, May 19, 2021 at 1:46 PM Till Rohrmann 
> > >> wrote:
> > >>
> > >>> I like this idea. +1 for your proposal Konstantin.
> > >>>
> > >>> Cheers,
> > >>> Till
> > >>>
> > >>> On Wed, May 19, 2021 at 1:30 PM Konstantin Knauf <
> > >> konstan...@ververica.com
> > 
> > >>> wrote:
> > >>>
> >  Hi everyone,
> > 
> >  Till and I recently discussed whether we should disable the
> >  "stale-blocker", "stale-critical", "stale-major" and "stale-minor"
> > >> rules
> >  for tickets that have a fixVersion set. This would allow people to
> > plan
> > >>> the
> >  upcoming release without tickets being deprioritized by the bot
> during
> > >>> the
> >  release cycle.
> > 
> >   From my point of view, this is a good idea as long as we can agree
> to
> > >> use
> >  the "fixVersion" a bit more conservatively. What do I mean by that?
> If
> > >>> you
> >  would categorize tickets planned for an upcoming release into:
> > 
> >  * Must Have
> >  * Should Have
> >  * Nice-To-Have
> > 
> >  only "Must Have" and "Should Have" tickets should get a fixVersion.
> > >> From
> > >>> my
> >  observation, we currently often set the fixVersion if we just
> wished a
> >  feature was included in an upcoming release. Similarly, I often see
> > >> bulk
> >  changes of fixVersion that "roll over" many tickets to the next
> > release
> > >>> if
> >  they have not made into the previous release although there is no
> > >>> concrete
> >  plan to fix them or they have even become obsolete by then.
> Excluding
> > >>> those
> >  from the bot would be counterproductive.
> > 
> >  What do you think?
> > 
> >  Cheers,
> > 
> >  Konstantin
> > 
> > 
> >  On Fri, Apr 23, 2021 at 2:25 PM Konstantin Knauf  >
> >  wrote:
> > 
> > > Hi everyone,
> > >
> > > After some offline conversations, I think, it makes sense to
> already
> > >>> open
> > > this thread now in order to collect feedback and suggestions around
> > >> the
> > > Jira Bot.
> > >
> > > The following two changes I will do right away:
> > >
> > > * increase "stale-assigned.stale-days" to 14 days (Marta, Stephan,
> > >> Nico
> > > have provided feedback that this is too aggressive).
> > >
> > > * exclude Sub-Tasks from all rules except the "stale-assigned" rule
> > >> (I
> > > think, this was just 

Re: Add control mode for flink

2021-06-07 Thread Jark Wu
Thanks Xintong for the summary,

I'm big +1 for this feature.

Xintong's summary for Table/SQL's needs is correct.
The "custom (broadcast) event" feature is important to us
and even blocks further awesome features and optimizations in Table/SQL.
I also discussed offline with @Yun Gao  several times
for this topic,
and we all agreed this is a reasonable feature but may need some careful
design.

Best,
Jark


On Mon, 7 Jun 2021 at 14:52, Xintong Song  wrote:

> Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.
>
> I was part of the preliminary offline discussions before this proposal
> went public. So maybe I can help clarify things a bit.
>
> In short, despite the phrase "control mode" might be a bit misleading,
> what we truly want to do from my side is to make the concept of "control
> flow" explicit and expose it to users.
>
> ## Background
> Jiangang & his colleagues at Kuaishou maintain an internal version of
> Flink. One of their custom features is allowing dynamically changing
> operator behaviors via the REST APIs. He's willing to contribute this
> feature to the community, and came to Yun Gao and me for suggestions. After
> discussion, we feel that the underlying question to be answered is how do
> we model the control flow in Flink. Dynamically controlling jobs via REST
> API can be one of the features built on top of the control flow, and there
> could be others.
>
> ## Control flow
> Control flow refers to the communication channels for sending
> events/signals to/between tasks/operators, that changes Flink's behavior in
> a way that may or may not affect the computation logic. Typical control
> events/signals Flink currently has are watermarks and checkpoint barriers.
>
> In general, for modeling control flow, the following questions should be
> considered.
> 1. Who (which component) is responsible for generating the control
> messages?
> 2. Who (which component) is responsible for reacting to the messages.
> 3. How do the messages propagate?
> 4. When it comes to affecting the computation logics, how should the
> control flow work together with the exact-once consistency.
>
> 1) & 2) may vary depending on the use cases, while 3) & 4) probably share
> many things in common. A unified control flow model would help deduplicate
> the common logics, allowing us to focus on the use case specific parts.
>
> E.g.,
> - Watermarks: generated by source operators, handled by window operators.
> - Checkpoint barrier: generated by the checkpoint coordinator, handled by
> all tasks
> - Dynamic controlling: generated by JobMaster (in reaction to the REST
> command), handled by specific operators/UDFs
> - Operator defined events: The following features are still in planning,
> but may potentially benefit from the control flow model. (Please correct me
> if I'm wrong, @Yun, @Jark)
>   * Iteration: When a certain condition is met, we might want to signal
> downstream operators with an event
>   * Mini-batch assembling: Flink currently uses special watermarks for
> indicating the end of each mini-batch, which makes it tricky to deal with
> event time related computations.
>   * Hive dimension table join: For periodically reloaded hive tables, it
> would be helpful to have specific events signaling that a reloading is
> finished.
>   * Bootstrap dimension table join: This is similar to the previous one.
> In cases where we want to fully load the dimension table before starting
> joining the mainstream, it would be helpful to have an event signaling the
> finishing of the bootstrap.
>
> ## Dynamic REST controlling
> Back to the specific feature that Jiangang proposed, I personally think
> it's quite convenient. Currently, to dynamically change the behavior of an
> operator, we need to set up a separate source for the control events and
> leverage broadcast state. Being able to send the events via REST APIs
> definitely improves the usability.
>
> Leveraging dynamic configuration frameworks is for sure one possible
> approach. The reason we are in favor of introducing the control flow is
> that:
> - It benefits not only this specific dynamic controlling feature, but
> potentially other future features as well.
> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
> framework work together with Flink's consistency mechanism.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚  wrote:
>
>> Thank you for the reply. I have checked the post you mentioned. The
>> dynamic config may be useful sometimes. But it is hard to keep data
>> consistent in flink, for example, what if the dynamic config will take
>> effect when failover. Since dynamic config is a desire for users, maybe
>> flink can support it in some way.
>>
>> For the control mode, dynamic config is just one of the control modes. In
>> the google doc, I have list some other cases. For example, control events
>> are generated in operators or external services. Besides user's dynamic
>> config, flink 

[jira] [Created] (FLINK-22901) Introduce getChangeLogUpsertKeys in FlinkRelMetadataQuery

2021-06-07 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22901:


 Summary: Introduce getChangeLogUpsertKeys in FlinkRelMetadataQuery
 Key: FLINK-22901
 URL: https://issues.apache.org/jira/browse/FLINK-22901
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.14.0


{code:java}
/**
 * Determines the set of change log upsert minimal keys for this expression. A 
key is
 * represented as an {@link org.apache.calcite.util.ImmutableBitSet}, where 
each bit position
 * represents a 0-based output column ordinal.
 *
 * Different from the unique keys: In distributed streaming computing, one 
record may be
 * divided into RowKind.UPDATE_BEFORE and RowKind.UPDATE_AFTER. If a key 
changing join is
 * connected downstream, the two records will be divided into different tasks, 
resulting in
 * disorder. In this case, the downstream cannot rely on the order of the 
original key. So in
 * this case, it has unique keys in the traditional sense, but it doesn't have 
change log upsert
 * keys.
 *
 * @return set of keys, or null if this information cannot be determined 
(whereas empty set
 * indicates definitely no keys at all)
 */
public Set getChangeLogUpsertKeys(RelNode rel);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22902) Port KafkaSink to FLIP-143

2021-06-07 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-22902:
---

 Summary: Port KafkaSink to FLIP-143
 Key: FLINK-22902
 URL: https://issues.apache.org/jira/browse/FLINK-22902
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: Arvid Heise
 Fix For: 1.14.0


As one of the most commonly connectors, Kafka sink should be ported to the new 
interfaces as quickly as possible such that Flink would support batch queries 
on Kafka.

The implementation should probably follow the current implementation as close 
as possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: recover from svaepoint

2021-06-07 Thread Piotr Nowojski
Hi,

Thanks Tianxin and 周瑞' for reporting and tracking down the problem. Indeed
that could be the reason behind it. Have either of you already created a
JIRA ticket for this bug?

> Concerning the required changing of the UID of an operator Piotr, is this
a known issue and documented somewhere? I find this rather surprising from
a user's point of view.

You don't need to change UID, you just need to make sure that the
transactional.ids are unique, either via UID or the task name.

It's not documented, and spawning a new job from an old one, while keeping
both of them running is not officially supported. In order to officially
support this scenario, Flink would need to have a better support of
stop-with-savepoint (commit pending transactions, without starting new
transactions for new records - that would require us to extend
PublicAPI). `setLogFailuresOnly` has a purpose of recovering from some
critical/fatal otherwise failures, for example if transactions have timed
out. It's a lucky conicindence that it can be leveraged in this scenario..

Best,
Piotrek

czw., 3 cze 2021 o 11:43 Till Rohrmann  napisał(a):

> Thanks for this insight. So the problem might be Flink using an internal
> Kafka API (the connector uses reflection to get hold of the
> TransactionManager) which changed between version 2.4.1 and 2.5. I think
> this is a serious problem because it breaks our end-to-end exactly once
> story when using new Kafka versions.
>
> Cheers,
> Till
>
> On Thu, Jun 3, 2021 at 10:17 AM Tianxin Zhao  wrote:
>
>> I encountered the exact same issue before when experimenting in a testing
>> environment. I was not able to spot the bug as mentioned in this thread,
>> the solution I did was to downgrade my own kafka-client version from 2.5 to
>> 2.4.1, matching the version of flink-connector-kafka.
>> In 2.4.1 Kafka, TransactionManager is initializing producerIdAndEpoch
>> using
>>
>> this.producerIdAndEpoch = new ProducerIdAndEpoch(NO_PRODUCER_ID,
>>> NO_PRODUCER_EPOCH);
>>
>>
>> instead of
>>
>>> this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
>>
>>
>> On Thu, Jun 3, 2021 at 12:11 AM Till Rohrmann 
>> wrote:
>>
>>> Thanks for the update. Skimming over the code it looks indeed that we
>>> are overwriting the values of the static value ProducerIdAndEpoch.NONE. I
>>> am not 100% how this will cause the observed problem, though. I am also not
>>> a Flink Kafka connector and Kafka expert so I would appreciate it if
>>> someone more familiar could double check this part of the code.
>>>
>>> Concerning the required changing of the UID of an operator Piotr, is
>>> this a known issue and documented somewhere? I find this rather surprising
>>> from a user's point of view.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jun 3, 2021 at 8:53 AM Till Rohrmann 
>>> wrote:
>>>
 Forwarding 周瑞's message to a duplicate thread:

 After our analysis, we found a bug in the
 org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction
 method
 The analysis process is as follows:


 org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction
 public void initializeState(FunctionInitializationContext context)
 throws Exception {
 state =
 context.getOperatorStateStore().getListState(stateDescriptor);
 boolean recoveredUserContext = false;
 if (context.isRestored()) {
 LOG.info("{} - restoring state", name());
 for (State operatorState : state.get()) {
 userContext = operatorState.getContext();
 List> recoveredTransactions =
 operatorState.getPendingCommitTransactions();
 List handledTransactions = new
 ArrayList<>(recoveredTransactions.size() + 1);
 for (TransactionHolder recoveredTransaction :
 recoveredTransactions) {
 // If this fails to succeed eventually, there is
 actually data loss
 recoverAndCommitInternal(recoveredTransaction);
 handledTransactions.add(recoveredTransaction.handle);
 LOG.info("{} committed recovered transaction {}",
 name(), recoveredTransaction);
 }

 {
 TXN transaction =
 operatorState.getPendingTransaction().handle;
 recoverAndAbort(transaction);
 handledTransactions.add(transaction);
 LOG.info(
 "{} aborted recovered transaction {}",
 name(),
 operatorState.getPendingTransaction());
 }

 if (userContext.isPresent()) {
 finishRecoveringContext(handledTransactions);
 recoveredUserContext = true;
 }
 }
 }

 (1)recoverAndCommitInternal(recoveredTransaction);
 The previous tr

[jira] [Created] (FLINK-22903) Code of method xxx of class "StreamExecCalc$1248" grows beyond 64 KB

2021-06-07 Thread smith jayden (Jira)
smith jayden created FLINK-22903:


 Summary: Code of method xxx of class "StreamExecCalc$1248" grows 
beyond 64 KB
 Key: FLINK-22903
 URL: https://issues.apache.org/jira/browse/FLINK-22903
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.13.0
 Environment: my sql query is under below:

insert into `jz_lbs_data_cnt` 
 select case when substr(geohash,1,8) in 
('wecntxr1','wecntxr3','wecntxr9','wecntxrc','wecntz21','wecntz23','wecntxr4','wecntxr6','wecntxrd','wecntxrf','wecntz24','wecntz26','wecntz2d','wecntxr7','wecntxre','wecntxrg','wecntz25','wecntz27','wecntz2e','wecntxrk','wecntxrs','wecntxru','wecntz2h','wecntz2k','wecntz2s','wecntz2u','wecntxrm','wecntxrt','wecntxrv','wecntz2j','wecntz2m','wecntz2t','wecntz2v','wecntz3j','wecntxrq','wecntxrw','wecntxry','wecntz2n','wecntz2q','wecntz2w','wecntz2y','wecntz3n','wecntxrx','wecntxrz','wecntz2p','wecntz2r','wecntz2x','wecntz2z','wecntz3p','wecntz3r','wecntxx8','wecntxxb','wecntz80','wecntz82','wecntz88','wecntz8b','wecntz90','wecntz92','wecntxxc','wecntz81','wecntz83','wecntz89','wecntz8c','wecntz91','wecntz93','wecntz84','wecntz86','wecntz8d','wecntz8f','wecntz94','wecntz96','wecntz9d','wecntz87','wecntz8e','wecntz8g','wecntz95','wecntz97','wecntz9e','wecntz8s','wecntz8u','wecntz9h','wecntz9k','wecntz9s','wecntz9u','wecntz8v','wecntz9j','wecntz9m','wecntz9t','wecntz9v','wecntz8y','wecntz9n','wecntz9q','wecntz9w','wecntz9p','wecntz9r','wecntzc2')
 then '202008240001' 
 when substr(geohash,1,8) in 
('wecnwrvh','wecnwrvk','wecnwrvs','wecnwrvu','wecnwryh','wecnwryk','wecnwrum','wecnwrut','wecnwruv','wecnwrvj','wecnwrvm','wecnwrvt','wecnwrvv','wecnwryj','wecnwrym','wecnwryt','wecnwrun','wecnwruq','wecnwruw','wecnwruy','wecnwrvn','wecnwrvq','wecnwrvw','wecnwrvy','wecnwryn','wecnwryq','wecnwryw','wecnwrgz','wecnwrup','wecnwrur','wecnwrux','wecnwruz','wecnwrvp','wecnwrvr','wecnwrvx','wecnwrvz','wecnwryp','wecnwryr','wecnwryx','wecny258','wecny25b','wecny2h0','wecny2h2','wecny2h8','wecny2hb','wecny2j0','wecny2j2','wecny2j8','wecny2jb','wecny2n0','wecny2n2','wecny2n8','wecny259','wecny25c','wecny2h1','wecny2h3','wecny2h9','wecny2hc','wecny2j1','wecny2j3','wecny2j9','wecny2jc','wecny2n1','wecny25d','wecny25f','wecny2h4','wecny2h6','wecny2hd','wecny2hf','wecny2j4','wecny2j6','wecny2jd','wecny2jf','wecny257','wecny25e','wecny25g','wecny2h5','wecny2h7','wecny2he','wecny2hg','wecny2j5','wecny2j7','wecny2je','wecny25k','wecny25s','wecny25u','wecny2hh','wecny2hk','wecny2hs','wecny2hu','wecny2jh','wecny25m','wecny25t','wecny25v','wecny2hj','wecny2hm','wecny2ht','wecny2hv','wecny25q','wecny25w','wecny25y','wecny2hn','wecny2hq','wecny2hw','wecny25x','wecny25z','wecny2hp')
 then '202105180001' else '其他' end as tag
 ,TUMBLE_START(eventTime, INTERVAL '10' MINUTE) as window_start
 ,TUMBLE_END(eventTime, INTERVAL '10' MINUTE) as window_end
 ,count(distinct gid) as cnt
 from chengjw.gt_lbs_type11
 where
 -- area in (0,8100) and
 substr(geohash,1,8) in 
('wecntxr1','wecntxr3','wecntxr9','wecntxrc','wecntz21','wecntz23','wecntxr4','wecntxr6','wecntxrd','wecntxrf','wecntz24','wecntz26','wecntz2d','wecntxr7','wecntxre','wecntxrg','wecntz25','wecntz27','wecntz2e','wecntxrk','wecntxrs','wecntxru','wecntz2h','wecntz2k','wecntz2s','wecntz2u','wecntxrm','wecntxrt','wecntxrv','wecntz2j','wecntz2m','wecntz2t','wecntz2v','wecntz3j','wecntxrq','wecntxrw','wecntxry','wecntz2n','wecntz2q','wecntz2w','wecntz2y','wecntz3n','wecntxrx','wecntxrz','wecntz2p','wecntz2r','wecntz2x','wecntz2z','wecntz3p','wecntz3r','wecntxx8','wecntxxb','wecntz80','wecntz82','wecntz88','wecntz8b','wecntz90','wecntz92','wecntxxc','wecntz81','wecntz83','wecntz89','wecntz8c','wecntz91','wecntz93','wecntz84','wecntz86','wecntz8d','wecntz8f','wecntz94','wecntz96','wecntz9d','wecntz87','wecntz8e','wecntz8g','wecntz95','wecntz97','wecntz9e','wecntz8s','wecntz8u','wecntz9h','wecntz9k','wecntz9s','wecntz9u','wecntz8v','wecntz9j','wecntz9m','wecntz9t','wecntz9v','wecntz8y','wecntz9n','wecntz9q','wecntz9w','wecntz9p','wecntz9r','wecntzc2','wecnwrvh','wecnwrvk','wecnwrvs','wecnwrvu','wecnwryh','wecnwryk','wecnwrum','wecnwrut','wecnwruv','wecnwrvj','wecnwrvm','wecnwrvt','wecnwrvv','wecnwryj','wecnwrym','wecnwryt','wecnwrun','wecnwruq','wecnwruw','wecnwruy','wecnwrvn','wecnwrvq','wecnwrvw','wecnwrvy','wecnwryn','wecnwryq','wecnwryw','wecnwrgz','wecnwrup','wecnwrur','wecnwrux','wecnwruz','wecnwrvp','wecnwrvr','wecnwrvx','wecnwrvz','wecnwryp','wecnwryr','wecnwryx','wecny258','wecny25b','wecny2h0','wecny2h2','wecny2h8','wecny2hb','wecny2j0','wecny2j2','wecny2j8','wecny2jb','wecny2n0','wecny2n2','wecny2n8','wecny259','wecny25c','wecny2h1','wecny2h3','wecny2h9','wecny2hc','wecny2j1','wecny2j3','wecny2j9','wecny2jc','wecny2n1','wecny25d','wecny25f','wecny2h4','wecny2h6','wecny2hd','wecny2hf','wecny2j4','wecny2j6','wecny2jd','wecny2jf','wecny257','wecny25e','wecny25g','wecny2h5','w

[jira] [Created] (FLINK-22904) Performance regression on 25.05.2020 in mapRebalanceMapSink

2021-06-07 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-22904:
--

 Summary: Performance regression on 25.05.2020 in 
mapRebalanceMapSink
 Key: FLINK-22904
 URL: https://issues.apache.org/jira/browse/FLINK-22904
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.14.0
Reporter: Piotr Nowojski


http://codespeed.dak8s.net:8000/timeline/?ben=mapRebalanceMapSink&env=2

Suspected range in which this regression happened 21c44688e98..80ad5b3b51



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22905) Versioned Table's SQL Script was missing a "," at Line 7 which yields Could not execute SQL statement ERROR

2021-06-07 Thread liuyan (Jira)
liuyan created FLINK-22905:
--

 Summary: Versioned Table's SQL Script was missing a "," at Line 7 
which yields Could not execute SQL statement ERROR
 Key: FLINK-22905
 URL: https://issues.apache.org/jira/browse/FLINK-22905
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.13.1, 1.13.0
Reporter: liuyan


Very minimal updates on the documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/versioned_tables/

The Script to create a Versioned Table was missing a "," at Line 7 

If any user copying this script and create their versioned table it will result 
below error 

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "WATERMARK" at 
line 7, column 3.
Was expecting one of:
")" ...
"," ...

The proposed change is to add "," to Line 7 to correctly pass the semantic 
check 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22906) Add build time to Flink documentation

2021-06-07 Thread Seth Wiesman (Jira)
Seth Wiesman created FLINK-22906:


 Summary: Add build time to Flink documentation
 Key: FLINK-22906
 URL: https://issues.apache.org/jira/browse/FLINK-22906
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Seth Wiesman
Assignee: Seth Wiesman






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-06-07 Thread Arvid Heise
Sorry for joining the party so late, but it's such an interesting FLIP with
a huge impact that I wanted to add my 2 cents. [1]
I'm mirroring some basic question from the PR review to this thread because
it's about the name:

We could rename the thing to ConcatenatedSource(s), SourceSequence, or
similar.
Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
does not carry the concatentation concept as well (hybrid sounds to me more
like the source would constantly switch back and forth).

Could we take a few minutes to think if this is the most intuitive name for
new users? I'm especially hoping that natives might give some ideas (or
declare that Hybrid is perfect).

[1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664

On Sun, Jun 6, 2021 at 7:47 PM Steven Wu  wrote:

> > Converter function relies on the specific enumerator capabilities to set
> the new start position (e.g.
> fileSourceEnumerator.getEndTimestamp() and
> kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
>
> I guess the premise is that a converter is for a specific tuple of
> (upstream source, downstream source) . We don't have to define generic
> EndtStateT and SwitchableEnumerator interfaces. That should work.
>
> The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> probably promoting uniformity across sources that support hybrid/switchable
> source.
>
> On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise  wrote:
>
> > Hi Steven,
> >
> > Thank you for the thorough review of the PR and for bringing this back
> > to the mailing list.
> >
> > All,
> >
> > I updated the FLIP-150 page to highlight aspects in which the PR
> > deviates from the original proposal [1]. The goal would be to update
> > the FLIP soon and bring it to a vote, as previously suggested offline
> > by Nicholas.
> >
> > A few minor issues in the PR are outstanding and I'm working on test
> > coverage for the recovery behavior, which should be completed soon.
> >
> > The dynamic position transfer needs to be concluded before we can move
> > forward however.
> >
> > There have been various ideas, including the special
> > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > an enumerator interface extension to extract the end state.
> >
> > One goal in the FLIP is to "Reuse the existing Source connectors built
> > with FLIP-27 without any change." and I think it is important to honor
> > that goal given that fixed start positions do not require interface
> > changes.
> >
> > Based on the feedback the following might be a good solution for
> > runtime position transfer:
> >
> > * User supplies the optional converter function (not applicable for
> > fixed positions).
> > * Instead of relying on the enumerator checkpoint state [2], the
> > converter function will be supplied with the current and next
> > enumerator (source.createEnumerator).
> > * Converter function relies on the specific enumerator capabilities to
> > set the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > * HybridSourceSplitEnumerator starts new underlying enumerator
> >
> > With this approach, there is no need to augment FLIP-27 interfaces and
> > custom source capabilities are easier to integrate. Removing the
> > mandate to rely on enumerator checkpoint state also avoids potential
> > upgrade/compatibility issues.
> >
> > Thoughts?
> >
> > Thanks,
> > Thomas
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > [2]
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> >
> >
> > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu  wrote:
> > >
> > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > missed anything.
> > >
> > > Right now, the PR differs from the FLIP-150 doc regarding the
> converter.
> > > * Current PR uses the enumerator checkpoint state type as the input for
> > the
> > > converter
> > > * FLIP-150 defines a new EndStateT interface.
> > > It seems that the FLIP-150 approach of EndStateT is more flexible, as
> > > transition EndStateT doesn't have to be included in the upstream source
> > > checkpoint state.
> > >
> > > Let's look at two use cases:
> > > 1) static cutover time at 5 pm. File source reads all data btw 9 am - 5
> > pm,
> > > then Kafka source starts with initial position of 5 pm. In this case,
> > there
> > > is no need for converter or EndStateT since the starting time for Kafka
> > > source is known and fixed.
> > > 2) dynamic cutover time at 1 hour before now. This is useful when the
> > > bootstrap of historic data takes a long time (like days or weeks) and
> we
> > > don't know the exact time of cutover when a job is launched. Instead,
> we
> > > are instructing the file source to stop when it gets clo

Re: [DISCUSS] Watermark propagation with Sink API

2021-06-07 Thread Eron Wright
Piotr, David, and Arvid, we've had an expansive discussion but ultimately
the proposal is narrow.  It is:
1. When a watermark arrives at the sink operator, tell the sink function.
2. When the sink operator is idled, tell the sink function.

With these enhancements, we will significantly improve correctness in
multi-stage flows, and facilitate an exciting project in the Pulsar
community.  Would you please lend your support to FLIP-167 so that we can
land this enhancement for 1.14?  My deepest thanks!

-Eron




On Mon, Jun 7, 2021 at 4:45 AM Arvid Heise  wrote:

> Hi Eron,
>
> you either have very specific use cases in mind or have a misconception
> about idleness in Flink with the new sources. The basic idea is that you
> have watermark generators only at the sources and the user supplies them.
> As a source author, you have no option to limit that. Here a bit of
> background:
>
> We observed that many users that read from Kafka were confused about no
> visible progress in their Flink applications because of some idle partition
> and we introduced idleness subsequently. Idleness was always considered as
> a means to achieve progress at the risk of losing a bit of correctness.
> So especially in the case that you describe with a Pulsar partition that is
> empty but indefinitely active, the user needs to be able to use idleness
> such that downstream window operators progress.
>
> I hope to have clarified that "I wouldn't recommend using withIdleness()
> with source-based watermarks." would pretty much make the intended use case
> not work anymore.
>
> ---
>
> Nevertheless, from the discussion with you and some offline discussion with
> Piotr and Dawid, we actually found quite a bit of drawbacks from the
> current definition of idleness:
> - We currently only use idleness to exclude respective upstream tasks from
> participating in watermark generation (as you have eloquently put further
> up in the thread).
> - However, the definition is bound to records. So while a partition is
> idle, no records should be produced.
> - That brings us into quite a few edge cases, where operators emit records,
> while they are actually idling: Think of timers, asyncIO operators, window
> operators based on timeouts, etc.
> - The solution would be to turn the operator active while emitting and
> returning to being idle afterwards (but when?). However, this has some
> unintended side-effects depending on when you switch back.
>
> We are currently thinking that we should rephrase the definition to what
> you described:
> - A channel that is active is providing watermarks.
> - An idle channel is not providing any watermarks but can deliver records.
> - Then we are not talking about idle partitions anymore but explicit and
> implicit watermark generation and should probably rename the concepts.
> - This would probably mean that we also need an explicit markActive in
> source/sink to express that the respective entity now needs to wait for
> explicit watermarks.
>
> I'll open a proper discussion thread tomorrow.
>
> Note that we probably shouldn't rush this FLIP until we have clarified the
> semantics of idleness. We could also cut the scope of the FLIP to exclude
> idleness and go ahead without it (there should be enough binding votes
> already).
>
> On Sat, Jun 5, 2021 at 12:09 AM Eron Wright  .invalid>
> wrote:
>
> > I understand your scenario but I disagree with its assumptions:
> >
> > "However, the partition of A is empty and thus A is temporarily idle." -
> > you're assuming that the behavior of the source is to mark itself idle if
> > data isn't available, but that's clearly source-specific and not behavior
> > we expect to have in Pulsar source.  A partition may be empty
> indefinitely
> > while still being active.  Imagine that the producer is defending a
> lease -
> > "I'm here, there's no data, please don't advance the clock".
> >
> > "we bind idleness to wall clock time" - you're characterizing a specific
> > strategy (WatermarkStrategy.withIdleness()), not the inherent behavior of
> > the pipeline.  I wouldn't recommend using withIdleness() with
> source-based
> > watermarks.
> >
> > I do agree that dynamism in partition assignment can wreak havoc on
> > watermark correctness.  We have some ideas on the Pulsar side about that
> > too.  I would ask that we focus on the Flink framework and pipeline
> > behavior.  By offering a more powerful framework, we encourage stream
> > storage systems to "rise to the occasion" - treat event time in a
> > first-class way, optimize for correctness, etc.  In this case, FLIP-167
> is
> > setting the stage for evolution in Pulsar.
> >
> > Thanks again Arvid for the great discussion.
> >
> >
> >
> >
> >
> > On Fri, Jun 4, 2021 at 2:06 PM Arvid Heise  wrote:
> >
> > > At least one big motivation is having (temporary) empty partitions. Let
> > me
> > > give you an example, why imho idleness is only approximate in this
> case:
> > > Assume you have source subtask A, B, C that correspond 

[jira] [Created] (FLINK-22907) SQL Client queries fails on select statement

2021-06-07 Thread Ryan Darling (Jira)
Ryan Darling created FLINK-22907:


 Summary: SQL Client queries fails on select statement
 Key: FLINK-22907
 URL: https://issues.apache.org/jira/browse/FLINK-22907
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.13.0
 Environment: python 3.7.6

JupyterLab

apache-flink==1.13.0
Reporter: Ryan Darling
 Attachments: flink_sql_issue1.JPG

I have configured a Jupyter notebook to test flink jobs with the sql client. 
All of my source / sink table creation statements are successful but we are 
unable to query the created tables

In this scenario we are attempting to pull data from a kafka topic into a 
source table and if successful insert into a sink table and on to another kafka 
topic. 

We start the sql_client.sh passing the needed jar file locations 
(flink-sql-connector-kafka_2.11-1.13.0.jar, 
flink-table-planner_2.12-1.13.0.jar, flink-table-common-1.13.0.jar, 
flink-sql-avro-confluent-registry-1.13.0.jar, 
flink-table-planner-blink_2.12-1.13.0.jar)

Next we create the source table and point to a kafka topic that we know has 
avro data in it and registered schemas in the schema registry. 

CREATE TABLE avro_sources ( 
 prop_id INT,
 check_in_dt STRING,
 check_out_dt STRING,
 los INT,
 guests INT,
 rate_amt INT
 ) WITH (
 'connector' = 'kafka',
 'topic' = 'avro_rate',
 'properties.bootstrap.servers' = '',
 'key.format' = 'avro-confluent',
 'key.avro-confluent.schema-registry.url' = '',
 'key.fields' = 'prop_id',
 'value.format' = 'avro-confluent',
 'value.avro-confluent.schema-registry.url' = '',
 'value.fields-include' = 'ALL',
 'key.avro-confluent.schema-registry.subject' = 'avro_rate',
 'value.avro-confluent.schema-registry.subject' = 'avro_rate'
 )

 

At this point I want to see the data that has been pulled into the source table 
and I get the following error and are struggling to find a solution. I feel 
this could be a bug 

Flink SQL> select * from avro_sources;

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be 
cast to org.codehaus.commons.compiler.ICompilerFactory

 

Any guidance on how I can resolve the bug or the problem would be appreciated. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Limit size of already processed files in File Source SplitEnumerator

2021-06-07 Thread Tianxin Zhao
Hi!

Currently Flink File Source relies on a Set pathsAlreadyProcessed in
SplitEnumerator to decide which file has been processed and avoids
reprocessing files if a file is already in this set. However this set could
be ever growing and ultimately exceed memory size if there are new files
continuously added to the input path.

I submitted https://issues.apache.org/jira/browse/FLINK-22792 and would
like to be assigned to the ticket.

Current proposed change as belows, would like to get an agreement on the
approach taken.

   1.

   Maintain fileWatermark updated by new files modification time in
   ContinuousFileSplitEnumerator
   2.

   Change Set pathsAlreadyProcessed to a HashMap
   pathsAlreadyProcessed where the key is same as before which is the file
   path of already processed files, and the value is file modification time,
   expose getModificationTime() method to FileSourceSplit.


   1.

   Adding a fileExpireTime user configurable config, any files older
than fileWatermark
   - fileExpireTime would get ignored.
   2.

   When snapshotting splitEnumerator, remove files that are older than
fileWatermark
   - fileExpireTime from the pathsAlreadyProcessed map.
   3.

   Adding alreadyProcessedPaths map and fileWatermark to
   PendingSplitsCheckpoint, modify the current
   PendingSplitsCheckpointSerializer to add a version2 serializer that
   serialize the alreadyProcessedPaths map which included file modification
   time.
   4.

   Subclass of PendingSplitsCheckpoint like
   ContinuousHivePendingSplitsCheckpoint would not be impacted by
   initializing an empty alreadyProcessedMap and 0 as initial watermark.

Thanks!


Re: Add control mode for flink

2021-06-07 Thread 刘建刚
Thanks Xintong Song for the detailed supplement. Since flink is
long-running, it is similar to many services. So interacting with it or
controlling it is a common desire. This was our initial thought when
implementing the feature. In our inner flink, many configs used in yaml can
be adjusted by dynamic to avoid restarting the job, for examples as follow:

   1. Limit the input qps.
   2. Degrade the job by sampling and so on.
   3. Reset kafka offset in certain cases.
   4. Stop checkpoint in certain cases.
   5. Control the history consuming.
   6. Change log level for debug.


After deep discussion, we realize that a common control flow
will benefit both users and developers. Dynamic config is just one of the
use cases. For the concrete design and implementation, it relates with many
components, like jobmaster, network channel, operators and so on, which
needs deeper consideration and design.

Xintong Song [via Apache Flink User Mailing List archive.] <
ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道:

> Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.
>
> I was part of the preliminary offline discussions before this proposal
> went public. So maybe I can help clarify things a bit.
>
> In short, despite the phrase "control mode" might be a bit misleading,
> what we truly want to do from my side is to make the concept of "control
> flow" explicit and expose it to users.
>
> ## Background
> Jiangang & his colleagues at Kuaishou maintain an internal version of
> Flink. One of their custom features is allowing dynamically changing
> operator behaviors via the REST APIs. He's willing to contribute this
> feature to the community, and came to Yun Gao and me for suggestions. After
> discussion, we feel that the underlying question to be answered is how do
> we model the control flow in Flink. Dynamically controlling jobs via REST
> API can be one of the features built on top of the control flow, and there
> could be others.
>
> ## Control flow
> Control flow refers to the communication channels for sending
> events/signals to/between tasks/operators, that changes Flink's behavior in
> a way that may or may not affect the computation logic. Typical control
> events/signals Flink currently has are watermarks and checkpoint barriers.
>
> In general, for modeling control flow, the following questions should be
> considered.
> 1. Who (which component) is responsible for generating the control
> messages?
> 2. Who (which component) is responsible for reacting to the messages.
> 3. How do the messages propagate?
> 4. When it comes to affecting the computation logics, how should the
> control flow work together with the exact-once consistency.
>
> 1) & 2) may vary depending on the use cases, while 3) & 4) probably share
> many things in common. A unified control flow model would help deduplicate
> the common logics, allowing us to focus on the use case specific parts.
>
> E.g.,
> - Watermarks: generated by source operators, handled by window operators.
> - Checkpoint barrier: generated by the checkpoint coordinator, handled by
> all tasks
> - Dynamic controlling: generated by JobMaster (in reaction to the REST
> command), handled by specific operators/UDFs
> - Operator defined events: The following features are still in planning,
> but may potentially benefit from the control flow model. (Please correct me
> if I'm wrong, @Yun, @Jark)
>   * Iteration: When a certain condition is met, we might want to signal
> downstream operators with an event
>   * Mini-batch assembling: Flink currently uses special watermarks for
> indicating the end of each mini-batch, which makes it tricky to deal with
> event time related computations.
>   * Hive dimension table join: For periodically reloaded hive tables, it
> would be helpful to have specific events signaling that a reloading is
> finished.
>   * Bootstrap dimension table join: This is similar to the previous one.
> In cases where we want to fully load the dimension table before starting
> joining the mainstream, it would be helpful to have an event signaling the
> finishing of the bootstrap.
>
> ## Dynamic REST controlling
> Back to the specific feature that Jiangang proposed, I personally think
> it's quite convenient. Currently, to dynamically change the behavior of an
> operator, we need to set up a separate source for the control events and
> leverage broadcast state. Being able to send the events via REST APIs
> definitely improves the usability.
>
> Leveraging dynamic configuration frameworks is for sure one possible
> approach. The reason we are in favor of introducing the control flow is
> that:
> - It benefits not only this specific dynamic controlling feature, but
> potentially other future features as well.
> - AFAICS, it's non-trivial to make a 3rd-party dynamic configuration
> framework work together with Flink's consistency mechanism.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 7, 2021 at 11:05 AM 刘建刚 <[hidden email]
> <

[jira] [Created] (FLINK-22908) FileExecutionGraphInfoStoreTest.testPutSuspendedJobOnClusterShutdown fails on azure

2021-06-07 Thread Xintong Song (Jira)
Xintong Song created FLINK-22908:


 Summary: 
FileExecutionGraphInfoStoreTest.testPutSuspendedJobOnClusterShutdown fails on 
azure
 Key: FLINK-22908
 URL: https://issues.apache.org/jira/browse/FLINK-22908
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Xintong Song
 Fix For: 1.14.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=18754&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392&l=7744


{code}

Jun 08 00:03:01 at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
Jun 08 00:03:01 at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
Jun 08 00:03:01 at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
Jun 08 00:03:01 at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
Jun 08 00:03:01 at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
Jun 08 00:03:01 at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Jun 08 00:03:01 Caused by: 
org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Discard message, 
because the rpc endpoint akka://flink/user/rpc/resourcemanager_2 has not been 
started yet.
Jun 08 00:03:01 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:170)
Jun 08 00:03:01 at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
Jun 08 00:03:01 at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
Jun 08 00:03:01 at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
Jun 08 00:03:01 at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
Jun 08 00:03:01 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
Jun 08 00:03:01 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
Jun 08 00:03:01 at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
Jun 08 00:03:01 at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
Jun 08 00:03:01 at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
Jun 08 00:03:01 at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
Jun 08 00:03:01 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
Jun 08 00:03:01 at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
Jun 08 00:03:01 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
Jun 08 00:03:01 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
Jun 08 00:03:01 at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
Jun 08 00:03:01 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Jun 08 00:03:01 at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
Jun 08 00:03:01 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-07 Thread wenlong.lwl
Thanks Yangze for the flip, it is great for users to be able to declare the
fine-grained resource requirements for the job.

I have one minor suggestion: can we support setting resource requirements
by configuration? Currently most of the config options in execution config
can be configured by configuration, and it is very likely that users need
to adjust the resource according to the performance of their job during
debugging,  Providing a configuration way will make it more convenient.

Bests,
Wenlong Lyu

On Thu, 3 Jun 2021 at 15:59, Xintong Song  wrote:

> Thanks Yangze for preparing the FLIP.
>
> The proposed changes look good to me.
>
> As you've mentioned in the implementation plan, I believe one of the most
> important tasks of this FLIP is to have the feature well documented. It
> would be really nice if we can keep that in mind and start drafting the
> documentation early.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo  wrote:
>
> > Hi, there,
> >
> > We would like to start a discussion thread on "FLIP-169: DataStream
> > API for Fine-Grained Resource Requirements"[1], where we propose the
> > DataStream API for specifying fine-grained resource requirements in
> > StreamExecutionEnvironment.
> >
> > Please find more details in the FLIP wiki document [1]. Looking
> > forward to your feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> >
> >
> > Best,
> > Yangze Guo
> >
>


[jira] [Created] (FLINK-22909) Supports change log inputs for event time operators

2021-06-07 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-22909:


 Summary: Supports change log inputs for event time operators
 Key: FLINK-22909
 URL: https://issues.apache.org/jira/browse/FLINK-22909
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
 Fix For: 1.14.0


Including: GroupWindow, Deduplicate, Over, Window TVF Agg, Window TVF Join, 
Window TVF Rank.

They are not support change log inputs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Add control mode for flink

2021-06-07 Thread JING ZHANG
Thanks Jiangang for bringing this up.
As mentioned in Jiangang's email, `dynamic configuration framework`
provides many useful functions in Kuaishou, because it could update job
behavior without relaunching the job. The functions are very popular in
Kuaishou, we also see similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea
about introducing control mode in Flink.
It takes the original issue a big step closer to essence which also
provides the possibility for more fantastic features as mentioned in
Xintong and Jark's response.
Based on the idea, there are at least two milestones to achieve the goals
which were proposed by Jiangang:
(1) Build a common control flow framework in Flink.
 It focuses on control flow propagation. And, how to integrate the
common control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users
directly.
 We could see dynamic configuration framework is a top application on
the underlying control flow framework.
 It focuses on the Public API which receives configuration updating
requests from users. Besides, it is necessary to introduce an API
protection mechanism to avoid job performance degradation caused by too
many control events.

I suggest splitting the whole design into two after we reach a consensus on
whether to introduce this feature because these two sub-topic all need
careful design.


[
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
]

Best regards,
JING ZHANG

刘建刚  于2021年6月8日周二 上午10:01写道:

> Thanks Xintong Song for the detailed supplement. Since flink is
> long-running, it is similar to many services. So interacting with it or
> controlling it is a common desire. This was our initial thought when
> implementing the feature. In our inner flink, many configs used in yaml can
> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>
>1. Limit the input qps.
>2. Degrade the job by sampling and so on.
>3. Reset kafka offset in certain cases.
>4. Stop checkpoint in certain cases.
>5. Control the history consuming.
>6. Change log level for debug.
>
>
> After deep discussion, we realize that a common control flow
> will benefit both users and developers. Dynamic config is just one of the
> use cases. For the concrete design and implementation, it relates with many
> components, like jobmaster, network channel, operators and so on, which
> needs deeper consideration and design.
>
> Xintong Song [via Apache Flink User Mailing List archive.] <
> ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道:
>
>> Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.
>>
>> I was part of the preliminary offline discussions before this proposal
>> went public. So maybe I can help clarify things a bit.
>>
>> In short, despite the phrase "control mode" might be a bit misleading,
>> what we truly want to do from my side is to make the concept of "control
>> flow" explicit and expose it to users.
>>
>> ## Background
>> Jiangang & his colleagues at Kuaishou maintain an internal version of
>> Flink. One of their custom features is allowing dynamically changing
>> operator behaviors via the REST APIs. He's willing to contribute this
>> feature to the community, and came to Yun Gao and me for suggestions. After
>> discussion, we feel that the underlying question to be answered is how do
>> we model the control flow in Flink. Dynamically controlling jobs via REST
>> API can be one of the features built on top of the control flow, and there
>> could be others.
>>
>> ## Control flow
>> Control flow refers to the communication channels for sending
>> events/signals to/between tasks/operators, that changes Flink's behavior in
>> a way that may or may not affect the computation logic. Typical control
>> events/signals Flink currently has are watermarks and checkpoint barriers.
>>
>> In general, for modeling control flow, the following questions should be
>> considered.
>> 1. Who (which component) is responsible for generating the control
>> messages?
>> 2. Who (which component) is responsible for reacting to the messages.
>> 3. How do the messages propagate?
>> 4. When it comes to affecting the computation logics, how should the
>> control flow work together with the exact-once consistency.
>>
>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably share
>> many things in common. A unified control flow model would help deduplicate
>> the common logics, allowing us to focus on the use case specific parts.
>>
>> E.g.,
>> - Watermarks: generated by source operators, handled by window operators.
>> - Checkpoint barrier: generated by the checkpoint coordinator, handled by
>> all tasks
>> - Dynamic controlling: generated by JobMaster (in reaction to the REST
>> command), handled by specific oper

Apply for permission to edit wiki

2021-06-07 Thread Senhong Liu
Hello,

My username is Senhong Liu (senhong...@gmail.com) and I want to apply
for permission to propose a FLIP.

Anyone who can help me? THX!

Best,
Senhong


Re: Add control mode for flink

2021-06-07 Thread kai wang
I'm big +1 for this feature.

   1. Limit the input qps.
   2. Change log level for debug.

in my team, the two examples above are needed

JING ZHANG  于2021年6月8日周二 上午11:18写道:

> Thanks Jiangang for bringing this up.
> As mentioned in Jiangang's email, `dynamic configuration framework`
> provides many useful functions in Kuaishou, because it could update job
> behavior without relaunching the job. The functions are very popular in
> Kuaishou, we also see similar demands in maillist [1].
>
> I'm big +1 for this feature.
>
> Thanks Xintong and Yun for deep thoughts about the issue. I like the idea
> about introducing control mode in Flink.
> It takes the original issue a big step closer to essence which also
> provides the possibility for more fantastic features as mentioned in
> Xintong and Jark's response.
> Based on the idea, there are at least two milestones to achieve the goals
> which were proposed by Jiangang:
> (1) Build a common control flow framework in Flink.
>  It focuses on control flow propagation. And, how to integrate the
> common control flow framework with existing mechanisms.
> (2) Builds a dynamic configuration framework which is exposed to users
> directly.
>  We could see dynamic configuration framework is a top application on
> the underlying control flow framework.
>  It focuses on the Public API which receives configuration updating
> requests from users. Besides, it is necessary to introduce an API
> protection mechanism to avoid job performance degradation caused by too
> many control events.
>
> I suggest splitting the whole design into two after we reach a consensus
> on whether to introduce this feature because these two sub-topic all need
> careful design.
>
>
> [
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
> ]
>
> Best regards,
> JING ZHANG
>
> 刘建刚  于2021年6月8日周二 上午10:01写道:
>
>> Thanks Xintong Song for the detailed supplement. Since flink is
>> long-running, it is similar to many services. So interacting with it or
>> controlling it is a common desire. This was our initial thought when
>> implementing the feature. In our inner flink, many configs used in yaml can
>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>
>>1. Limit the input qps.
>>2. Degrade the job by sampling and so on.
>>3. Reset kafka offset in certain cases.
>>4. Stop checkpoint in certain cases.
>>5. Control the history consuming.
>>6. Change log level for debug.
>>
>>
>> After deep discussion, we realize that a common control flow
>> will benefit both users and developers. Dynamic config is just one of the
>> use cases. For the concrete design and implementation, it relates with many
>> components, like jobmaster, network channel, operators and so on, which
>> needs deeper consideration and design.
>>
>> Xintong Song [via Apache Flink User Mailing List archive.] <
>> ml+s2336050n44245...@n4.nabble.com> 于2021年6月7日周一 下午2:52写道:
>>
>>> Thanks Jiangang for bringing this up, and Steven & Peter for the
>>> feedback.
>>>
>>> I was part of the preliminary offline discussions before this proposal
>>> went public. So maybe I can help clarify things a bit.
>>>
>>> In short, despite the phrase "control mode" might be a bit misleading,
>>> what we truly want to do from my side is to make the concept of "control
>>> flow" explicit and expose it to users.
>>>
>>> ## Background
>>> Jiangang & his colleagues at Kuaishou maintain an internal version of
>>> Flink. One of their custom features is allowing dynamically changing
>>> operator behaviors via the REST APIs. He's willing to contribute this
>>> feature to the community, and came to Yun Gao and me for suggestions. After
>>> discussion, we feel that the underlying question to be answered is how do
>>> we model the control flow in Flink. Dynamically controlling jobs via REST
>>> API can be one of the features built on top of the control flow, and there
>>> could be others.
>>>
>>> ## Control flow
>>> Control flow refers to the communication channels for sending
>>> events/signals to/between tasks/operators, that changes Flink's behavior in
>>> a way that may or may not affect the computation logic. Typical control
>>> events/signals Flink currently has are watermarks and checkpoint barriers.
>>>
>>> In general, for modeling control flow, the following questions should be
>>> considered.
>>> 1. Who (which component) is responsible for generating the control
>>> messages?
>>> 2. Who (which component) is responsible for reacting to the messages.
>>> 3. How do the messages propagate?
>>> 4. When it comes to affecting the computation logics, how should the
>>> control flow work together with the exact-once consistency.
>>>
>>> 1) & 2) may vary depending on the use cases, while 3) & 4) probably
>>> share many things in common. A unified control flow model would help
>>> deduplicate the common logics, allowing us to focus on the u

[jira] [Created] (FLINK-22910) ShuffleMaster enhancement for pluggable shuffle service framework

2021-06-07 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22910:
---

 Summary: ShuffleMaster enhancement for pluggable shuffle service 
framework
 Key: FLINK-22910
 URL: https://issues.apache.org/jira/browse/FLINK-22910
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Yingjie Cao
 Fix For: 1.14.0


The current _ShuffleMaster_ has an unclear lifecycle which is inconsistent with 
the _ShuffleEnvironment_ at the _TM_ side. Besides, it is hard to Implement 
some important capabilities for remote shuffle service. For example, 1) release 
external resources when a job finished; 2) Stop or start tracking some 
partitions depending on the status of the external service or system.

We drafted a document[1] which proposed some simple changes to solve these 
issues. The document is still not wholly completed yet. We will start a 
discussion once it is finished.

 

[1] 
https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-07 Thread Yangze Guo
Thanks for the feedbacks, Xintong and Wenlong!

@Wenlong
I think that is a good idea, adjust the resource without re-compiling
the job will facilitate the tuning process.
We can define a pattern "slot-sharing-group.resource.{ssg name}"
(welcome any proposal for the prefix naming) for the resource spec
config of a slot sharing group. Then, user can set the ResourceSpec of
SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
heap: 100m, off-heap: 100m}". WDYT?


Best,
Yangze Guo

On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl  wrote:
>
> Thanks Yangze for the flip, it is great for users to be able to declare the
> fine-grained resource requirements for the job.
>
> I have one minor suggestion: can we support setting resource requirements
> by configuration? Currently most of the config options in execution config
> can be configured by configuration, and it is very likely that users need
> to adjust the resource according to the performance of their job during
> debugging,  Providing a configuration way will make it more convenient.
>
> Bests,
> Wenlong Lyu
>
> On Thu, 3 Jun 2021 at 15:59, Xintong Song  wrote:
>
> > Thanks Yangze for preparing the FLIP.
> >
> > The proposed changes look good to me.
> >
> > As you've mentioned in the implementation plan, I believe one of the most
> > important tasks of this FLIP is to have the feature well documented. It
> > would be really nice if we can keep that in mind and start drafting the
> > documentation early.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo  wrote:
> >
> > > Hi, there,
> > >
> > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > API for Fine-Grained Resource Requirements"[1], where we propose the
> > > DataStream API for specifying fine-grained resource requirements in
> > > StreamExecutionEnvironment.
> > >
> > > Please find more details in the FLIP wiki document [1]. Looking
> > > forward to your feedback.
> > >
> > > [1]
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > >
> > >
> > > Best,
> > > Yangze Guo
> > >
> >


Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-07 Thread Yangze Guo
@Wenlong
After another consideration, the config option approach I mentioned
above might not be appropriate. The resource requirements for SSG
should be a job level configuration and should no be set in the
flink-conf.

I think we can define a JSON format, which would be the ResourceSpecs
mapped by the name of SSGs, for the resource requirements of a
specific job. Then, we allow user to configure the file path of that
JSON. The JSON will be only parsed in runtime, which allows user to
tune it without re-compiling the job.

We can add another #setSlotSharingGroupResources for configuring the
file path of that JSON:
```
/**
 * Specify fine-grained resource requirements for slot sharing groups
with the given resource JSON file. The existing resource
 * requirement of the same slot sharing group will be replaced.
 */
public StreamExecutionEnvironment setSlotSharingGroupResources(
String pathToResourceJson);
```

WDYT?

Best,
Yangze Guo

On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo  wrote:
>
> Thanks for the feedbacks, Xintong and Wenlong!
>
> @Wenlong
> I think that is a good idea, adjust the resource without re-compiling
> the job will facilitate the tuning process.
> We can define a pattern "slot-sharing-group.resource.{ssg name}"
> (welcome any proposal for the prefix naming) for the resource spec
> config of a slot sharing group. Then, user can set the ResourceSpec of
> SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> heap: 100m, off-heap: 100m}". WDYT?
>
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl  wrote:
> >
> > Thanks Yangze for the flip, it is great for users to be able to declare the
> > fine-grained resource requirements for the job.
> >
> > I have one minor suggestion: can we support setting resource requirements
> > by configuration? Currently most of the config options in execution config
> > can be configured by configuration, and it is very likely that users need
> > to adjust the resource according to the performance of their job during
> > debugging,  Providing a configuration way will make it more convenient.
> >
> > Bests,
> > Wenlong Lyu
> >
> > On Thu, 3 Jun 2021 at 15:59, Xintong Song  wrote:
> >
> > > Thanks Yangze for preparing the FLIP.
> > >
> > > The proposed changes look good to me.
> > >
> > > As you've mentioned in the implementation plan, I believe one of the most
> > > important tasks of this FLIP is to have the feature well documented. It
> > > would be really nice if we can keep that in mind and start drafting the
> > > documentation early.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo  wrote:
> > >
> > > > Hi, there,
> > > >
> > > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > > API for Fine-Grained Resource Requirements"[1], where we propose the
> > > > DataStream API for specifying fine-grained resource requirements in
> > > > StreamExecutionEnvironment.
> > > >
> > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > forward to your feedback.
> > > >
> > > > [1]
> > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > >
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > >


Re: Re: Add control mode for flink

2021-06-07 Thread Yun Gao
Very thanks Jiangang for bringing this up and very thanks for the discussion! 

I also agree with the summarization by Xintong and Jing that control flow seems 
to be
a common buidling block for many functionalities and dynamic configuration 
framework
is a representative application that frequently required by users. Regarding 
the control flow, 
currently we are also considering the design of iteration for the flink-ml, and 
as Xintong has pointed
out, it also required the control flow in cases like detection global 
termination inside the iteration
 (in this case we need to broadcast an event through the iteration body to 
detect if there are still 
records reside in the iteration body). And regarding  whether to implement the 
dynamic configuration 
framework, I also agree with Xintong that the consistency guarantee would be a 
point to consider, we 
might consider if we need to ensure every operator could receive the dynamic 
configuration. 

Best,
Yun



--
Sender:kai wang
Date:2021/06/08 11:52:12
Recipient:JING ZHANG
Cc:刘建刚; Xintong Song [via Apache Flink User Mailing 
List archive.]; 
user; dev
Theme:Re: Add control mode for flink



I'm big +1 for this feature. 

Limit the input qps.
Change log level for debug.
in my team, the two examples above are needed
JING ZHANG  于2021年6月8日周二 上午11:18写道:

Thanks Jiangang for bringing this up. 
As mentioned in Jiangang's email, `dynamic configuration framework` provides 
many useful functions in Kuaishou, because it could update job behavior without 
relaunching the job. The functions are very popular in Kuaishou, we also see 
similar demands in maillist [1].

I'm big +1 for this feature.

Thanks Xintong and Yun for deep thoughts about the issue. I like the idea about 
introducing control mode in Flink. 
It takes the original issue a big step closer to essence which also provides 
the possibility for more fantastic features as mentioned in Xintong and Jark's 
response.
Based on the idea, there are at least two milestones to achieve the goals which 
were proposed by Jiangang:
(1) Build a common control flow framework in Flink. 
 It focuses on control flow propagation. And, how to integrate the common 
control flow framework with existing mechanisms.
(2) Builds a dynamic configuration framework which is exposed to users 
directly. 
 We could see dynamic configuration framework is a top application on the 
underlying control flow framework. 
 It focuses on the Public API which receives configuration updating 
requests from users. Besides, it is necessary to introduce an API protection 
mechanism to avoid job performance degradation caused by too many control 
events.

I suggest splitting the whole design into two after we reach a consensus on 
whether to introduce this feature because these two sub-topic all need careful 
design.


[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html]

Best regards,
JING ZHANG
刘建刚  于2021年6月8日周二 上午10:01写道:

Thanks Xintong Song for the detailed supplement. Since flink is long-running, 
it is similar to many services. So interacting with it or controlling it is a 
common desire. This was our initial thought when implementing the feature. In 
our inner flink, many configs used in yaml can be adjusted by dynamic to avoid 
restarting the job, for examples as follow:

Limit the input qps.
Degrade the job by sampling and so on.
Reset kafka offset in certain cases.
Stop checkpoint in certain cases.
Control the history consuming.
Change log level for debug.

After deep discussion, we realize that a common control flow will benefit both 
users and developers. Dynamic config is just one of the use cases. For the 
concrete design and implementation, it relates with many components, like 
jobmaster, network channel, operators and so on, which needs deeper 
consideration and design. 
Xintong Song [via Apache Flink User Mailing List archive.] 
 于2021年6月7日周一 下午2:52写道:

Thanks Jiangang for bringing this up, and Steven & Peter for the feedback.

I was part of the preliminary offline discussions before this proposal went 
public. So maybe I can help clarify things a bit.

In short, despite the phrase "control mode" might be a bit misleading, what we 
truly want to do from my side is to make the concept of "control flow" explicit 
and expose it to users.

## Background
Jiangang & his colleagues at Kuaishou maintain an internal version of Flink. 
One of their custom features is allowing dynamically changing operator 
behaviors via the REST APIs. He's willing to contribute this feature to the 
community, and came to Yun Gao and me for suggestions. After discussion, we 
feel that the underlying question to be answered is how do we model the control 
flow in Flink. Dynamically controlling jobs via REST API can be one of the 
features built on top of the control flow, and there could be others.

## Cont

Re: [DISCUSS] FLIP-169: DataStream API for Fine-Grained Resource Requirements

2021-06-07 Thread Xintong Song
I think being able to specify fine grained resource requirements without
having to change the codes and recompile the job is indeed a good idea. It
definitely improves the usability.

However, this requires more careful designs, which probably deserves a
separate thread. I'd be good to have that discussion, but maybe not block
this feature on that.

One idea concerning the configuration approach: As Yangze said, flink
configuration options are supposed to take effect at cluster level. For
updating job level specifics that are not suitable to be introduced as a
config option, currently the only way is to pass them as program arguments.
Would it make sense to introduce a general approach for overwriting such
job specifics without re-compiling the job?

Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:23 PM Yangze Guo  wrote:

> @Wenlong
> After another consideration, the config option approach I mentioned
> above might not be appropriate. The resource requirements for SSG
> should be a job level configuration and should no be set in the
> flink-conf.
>
> I think we can define a JSON format, which would be the ResourceSpecs
> mapped by the name of SSGs, for the resource requirements of a
> specific job. Then, we allow user to configure the file path of that
> JSON. The JSON will be only parsed in runtime, which allows user to
> tune it without re-compiling the job.
>
> We can add another #setSlotSharingGroupResources for configuring the
> file path of that JSON:
> ```
> /**
>  * Specify fine-grained resource requirements for slot sharing groups
> with the given resource JSON file. The existing resource
>  * requirement of the same slot sharing group will be replaced.
>  */
> public StreamExecutionEnvironment setSlotSharingGroupResources(
> String pathToResourceJson);
> ```
>
> WDYT?
>
> Best,
> Yangze Guo
>
> On Tue, Jun 8, 2021 at 12:12 PM Yangze Guo  wrote:
> >
> > Thanks for the feedbacks, Xintong and Wenlong!
> >
> > @Wenlong
> > I think that is a good idea, adjust the resource without re-compiling
> > the job will facilitate the tuning process.
> > We can define a pattern "slot-sharing-group.resource.{ssg name}"
> > (welcome any proposal for the prefix naming) for the resource spec
> > config of a slot sharing group. Then, user can set the ResourceSpec of
> > SSG "ssg1" by adding "slot-sharing-group.resource.ssg1: {cpu: 1.0,
> > heap: 100m, off-heap: 100m}". WDYT?
> >
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jun 8, 2021 at 10:37 AM wenlong.lwl 
> wrote:
> > >
> > > Thanks Yangze for the flip, it is great for users to be able to
> declare the
> > > fine-grained resource requirements for the job.
> > >
> > > I have one minor suggestion: can we support setting resource
> requirements
> > > by configuration? Currently most of the config options in execution
> config
> > > can be configured by configuration, and it is very likely that users
> need
> > > to adjust the resource according to the performance of their job during
> > > debugging,  Providing a configuration way will make it more convenient.
> > >
> > > Bests,
> > > Wenlong Lyu
> > >
> > > On Thu, 3 Jun 2021 at 15:59, Xintong Song 
> wrote:
> > >
> > > > Thanks Yangze for preparing the FLIP.
> > > >
> > > > The proposed changes look good to me.
> > > >
> > > > As you've mentioned in the implementation plan, I believe one of the
> most
> > > > important tasks of this FLIP is to have the feature well documented.
> It
> > > > would be really nice if we can keep that in mind and start drafting
> the
> > > > documentation early.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Thu, Jun 3, 2021 at 3:13 PM Yangze Guo 
> wrote:
> > > >
> > > > > Hi, there,
> > > > >
> > > > > We would like to start a discussion thread on "FLIP-169: DataStream
> > > > > API for Fine-Grained Resource Requirements"[1], where we propose
> the
> > > > > DataStream API for specifying fine-grained resource requirements in
> > > > > StreamExecutionEnvironment.
> > > > >
> > > > > Please find more details in the FLIP wiki document [1]. Looking
> > > > > forward to your feedback.
> > > > >
> > > > > [1]
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-169+DataStream+API+for+Fine-Grained+Resource+Requirements
> > > > >
> > > > >
> > > > > Best,
> > > > > Yangze Guo
> > > > >
> > > >
>


[jira] [Created] (FLINK-22911) Align FLIP-136 (Improve interoperability between DataStream and Table API) in PyFlink Table API

2021-06-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-22911:
---

 Summary: Align FLIP-136 (Improve interoperability between 
DataStream and Table API) in PyFlink Table API
 Key: FLINK-22911
 URL: https://issues.apache.org/jira/browse/FLINK-22911
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22912) Support state ttl in Python DataStream API

2021-06-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-22912:
---

 Summary: Support state ttl in Python DataStream API
 Key: FLINK-22912
 URL: https://issues.apache.org/jira/browse/FLINK-22912
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22913) Support Python UDF chaining in Python DataStream API

2021-06-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-22913:
---

 Summary: Support Python UDF chaining in Python DataStream API
 Key: FLINK-22913
 URL: https://issues.apache.org/jira/browse/FLINK-22913
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0


Currently, for the following job:
{code}
ds = ..
ds.map(map_func1)
    .map(map_func2)
{code}

The Python function `map_func1` and `map_func2` will runs in separate Python 
workers and the result of `map_func1` will be transferred to JVM and then 
transferred to `map_func2` which may resides in another Python worker. This 
introduces redundant communication and serialization/deserialization overhead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: Add control mode for flink

2021-06-07 Thread Xintong Song
+1 on separating the effort into two steps:

   1. Introduce a common control flow framework, with flexible interfaces
   for generating / reacting to control messages for various purposes.
   2. Features that leverating the control flow can be worked on
   concurrently

Meantime, keeping collecting potential features that may leverage the
control flow should be helpful. It provides good inputs for the control
flow framework design, to make the framework common enough to cover the
potential use cases.

My suggestions on the next steps:

   1. Allow more time for opinions to be heard and potential use cases to
   be collected
   2. Draft a FLIP with the scope of common control flow framework
   3. We probably need a poc implementation to make sure the framework
   covers at least the following scenarios
  1. Produce control events from arbitrary operators
  2. Produce control events from JobMaster
  3. Consume control events from arbitrary operators downstream where
  the events are produced


Thank you~

Xintong Song



On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:

> Very thanks Jiangang for bringing this up and very thanks for the
> discussion!
>
> I also agree with the summarization by Xintong and Jing that control flow
> seems to be
> a common buidling block for many functionalities and dynamic configuration
> framework
> is a representative application that frequently required by users.
> Regarding the control flow,
> currently we are also considering the design of iteration for the
> flink-ml, and as Xintong has pointed
> out, it also required the control flow in cases like detection global
> termination inside the iteration
>  (in this case we need to broadcast an event through the iteration body
> to detect if there are still
> records reside in the iteration body). And regarding  whether to implement
> the dynamic configuration
> framework, I also agree with Xintong that the consistency guarantee would
> be a point to consider, we
> might consider if we need to ensure every operator could receive the
> dynamic configuration.
>
> Best,
> Yun
>
>
>
> --
> Sender:kai wang
> Date:2021/06/08 11:52:12
> Recipient:JING ZHANG
> Cc:刘建刚; Xintong Song [via Apache Flink User
> Mailing List archive.]; user<
> u...@flink.apache.org>; dev
> Theme:Re: Add control mode for flink
>
>
>
> I'm big +1 for this feature.
>
>1. Limit the input qps.
>2. Change log level for debug.
>
> in my team, the two examples above are needed
>
> JING ZHANG  于2021年6月8日周二 上午11:18写道:
>
>> Thanks Jiangang for bringing this up.
>> As mentioned in Jiangang's email, `dynamic configuration framework`
>> provides many useful functions in Kuaishou, because it could update job
>> behavior without relaunching the job. The functions are very popular in
>> Kuaishou, we also see similar demands in maillist [1].
>>
>> I'm big +1 for this feature.
>>
>> Thanks Xintong and Yun for deep thoughts about the issue. I like the idea
>> about introducing control mode in Flink.
>> It takes the original issue a big step closer to essence which also
>> provides the possibility for more fantastic features as mentioned in
>> Xintong and Jark's response.
>> Based on the idea, there are at least two milestones to achieve the goals
>> which were proposed by Jiangang:
>> (1) Build a common control flow framework in Flink.
>>  It focuses on control flow propagation. And, how to integrate the
>> common control flow framework with existing mechanisms.
>> (2) Builds a dynamic configuration framework which is exposed to users
>> directly.
>>  We could see dynamic configuration framework is a top application on
>> the underlying control flow framework.
>>  It focuses on the Public API which receives configuration updating
>> requests from users. Besides, it is necessary to introduce an API
>> protection mechanism to avoid job performance degradation caused by too
>> many control events.
>>
>> I suggest splitting the whole design into two after we reach a consensus
>> on whether to introduce this feature because these two sub-topic all need
>> careful design.
>>
>>
>> [
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamic-configuration-of-Flink-checkpoint-interval-td44059.html
>> ]
>>
>> Best regards,
>> JING ZHANG
>>
>> 刘建刚  于2021年6月8日周二 上午10:01写道:
>>
>>> Thanks Xintong Song for the detailed supplement. Since flink is
>>> long-running, it is similar to many services. So interacting with it or
>>> controlling it is a common desire. This was our initial thought when
>>> implementing the feature. In our inner flink, many configs used in yaml can
>>> be adjusted by dynamic to avoid restarting the job, for examples as follow:
>>>
>>>1. Limit the input qps.
>>>2. Degrade the job by sampling and so on.
>>>3. Reset kafka offset in certain cases.
>>>4. Stop checkpoint in certain cases.
>>>5. Control the history consuming.
>>>6. Chang

Re: Re: Add control mode for flink

2021-06-07 Thread Steven Wu
I can see the benefits of control flow. E.g., it might help the old (and
inactive) FLIP-17 side input. I would suggest that we add more details of
some of the potential use cases.

Here is one mismatch with using control flow for dynamic config. Dynamic
config is typically targeted/loaded by one specific operator. Control flow
will propagate the dynamic config to all operators. not a problem per se

Regarding using the REST api (to jobmanager) for accepting control
signals from external system, where are we going to persist/checkpoint the
signal? jobmanager can die before the control signal is propagated and
checkpointed. Did we lose the control signal in this case?


On Mon, Jun 7, 2021 at 11:05 PM Xintong Song  wrote:

> +1 on separating the effort into two steps:
>
>1. Introduce a common control flow framework, with flexible interfaces
>for generating / reacting to control messages for various purposes.
>2. Features that leverating the control flow can be worked on
>concurrently
>
> Meantime, keeping collecting potential features that may leverage the
> control flow should be helpful. It provides good inputs for the control
> flow framework design, to make the framework common enough to cover the
> potential use cases.
>
> My suggestions on the next steps:
>
>1. Allow more time for opinions to be heard and potential use cases to
>be collected
>2. Draft a FLIP with the scope of common control flow framework
>3. We probably need a poc implementation to make sure the framework
>covers at least the following scenarios
>   1. Produce control events from arbitrary operators
>   2. Produce control events from JobMaster
>   3. Consume control events from arbitrary operators downstream where
>   the events are produced
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Jun 8, 2021 at 1:37 PM Yun Gao  wrote:
>
>> Very thanks Jiangang for bringing this up and very thanks for the
>> discussion!
>>
>> I also agree with the summarization by Xintong and Jing that control flow
>> seems to be
>> a common buidling block for many functionalities and dynamic
>> configuration framework
>> is a representative application that frequently required by users.
>> Regarding the control flow,
>> currently we are also considering the design of iteration for the
>> flink-ml, and as Xintong has pointed
>> out, it also required the control flow in cases like detection global
>> termination inside the iteration
>>  (in this case we need to broadcast an event through the iteration body
>> to detect if there are still
>> records reside in the iteration body). And regarding  whether to
>> implement the dynamic configuration
>> framework, I also agree with Xintong that the consistency guarantee would
>> be a point to consider, we
>> might consider if we need to ensure every operator could receive the
>> dynamic configuration.
>>
>> Best,
>> Yun
>>
>>
>>
>> --
>> Sender:kai wang
>> Date:2021/06/08 11:52:12
>> Recipient:JING ZHANG
>> Cc:刘建刚; Xintong Song [via Apache Flink User
>> Mailing List archive.]; user<
>> u...@flink.apache.org>; dev
>> Theme:Re: Add control mode for flink
>>
>>
>>
>> I'm big +1 for this feature.
>>
>>1. Limit the input qps.
>>2. Change log level for debug.
>>
>> in my team, the two examples above are needed
>>
>> JING ZHANG  于2021年6月8日周二 上午11:18写道:
>>
>>> Thanks Jiangang for bringing this up.
>>> As mentioned in Jiangang's email, `dynamic configuration framework`
>>> provides many useful functions in Kuaishou, because it could update job
>>> behavior without relaunching the job. The functions are very popular in
>>> Kuaishou, we also see similar demands in maillist [1].
>>>
>>> I'm big +1 for this feature.
>>>
>>> Thanks Xintong and Yun for deep thoughts about the issue. I like the
>>> idea about introducing control mode in Flink.
>>> It takes the original issue a big step closer to essence which also
>>> provides the possibility for more fantastic features as mentioned in
>>> Xintong and Jark's response.
>>> Based on the idea, there are at least two milestones to achieve the
>>> goals which were proposed by Jiangang:
>>> (1) Build a common control flow framework in Flink.
>>>  It focuses on control flow propagation. And, how to integrate the
>>> common control flow framework with existing mechanisms.
>>> (2) Builds a dynamic configuration framework which is exposed to users
>>> directly.
>>>  We could see dynamic configuration framework is a top application
>>> on the underlying control flow framework.
>>>  It focuses on the Public API which receives configuration updating
>>> requests from users. Besides, it is necessary to introduce an API
>>> protection mechanism to avoid job performance degradation caused by too
>>> many control events.
>>>
>>> I suggest splitting the whole design into two after we reach a consensus
>>> on whether to introduce this feature because these two sub-top

Re: [DISCUSS]FLIP-150: Introduce Hybrid Source

2021-06-07 Thread Steven Wu
> hybrid sounds to me more like the source would constantly switch back and
forth

Initially, the focus of hybrid source is more like a sequenced chain.

But in the future it would be cool that hybrid sources can intelligently
switch back and forth between historical data source (like Iceberg) and
live data source (like Kafka). E.g.,
- if the Flink job is lagging behind Kafka retention, automatically switch
to Iceberg source
- once job caught up, switch back to Kafka source

That can simplify operational aspects of manually switching.


On Mon, Jun 7, 2021 at 8:07 AM Arvid Heise  wrote:

> Sorry for joining the party so late, but it's such an interesting FLIP with
> a huge impact that I wanted to add my 2 cents. [1]
> I'm mirroring some basic question from the PR review to this thread because
> it's about the name:
>
> We could rename the thing to ConcatenatedSource(s), SourceSequence, or
> similar.
> Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and
> does not carry the concatentation concept as well (hybrid sounds to me more
> like the source would constantly switch back and forth).
>
> Could we take a few minutes to think if this is the most intuitive name for
> new users? I'm especially hoping that natives might give some ideas (or
> declare that Hybrid is perfect).
>
> [1] https://github.com/apache/flink/pull/15924#pullrequestreview-677376664
>
> On Sun, Jun 6, 2021 at 7:47 PM Steven Wu  wrote:
>
> > > Converter function relies on the specific enumerator capabilities to
> set
> > the new start position (e.g.
> > fileSourceEnumerator.getEndTimestamp() and
> > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> >
> > I guess the premise is that a converter is for a specific tuple of
> > (upstream source, downstream source) . We don't have to define generic
> > EndtStateT and SwitchableEnumerator interfaces. That should work.
> >
> > The benefit of defining EndtStateT and SwitchableEnumerator interfaces is
> > probably promoting uniformity across sources that support
> hybrid/switchable
> > source.
> >
> > On Sun, Jun 6, 2021 at 10:22 AM Thomas Weise  wrote:
> >
> > > Hi Steven,
> > >
> > > Thank you for the thorough review of the PR and for bringing this back
> > > to the mailing list.
> > >
> > > All,
> > >
> > > I updated the FLIP-150 page to highlight aspects in which the PR
> > > deviates from the original proposal [1]. The goal would be to update
> > > the FLIP soon and bring it to a vote, as previously suggested offline
> > > by Nicholas.
> > >
> > > A few minor issues in the PR are outstanding and I'm working on test
> > > coverage for the recovery behavior, which should be completed soon.
> > >
> > > The dynamic position transfer needs to be concluded before we can move
> > > forward however.
> > >
> > > There have been various ideas, including the special
> > > "SwitchableEnumerator" interface, using enumerator checkpoint state or
> > > an enumerator interface extension to extract the end state.
> > >
> > > One goal in the FLIP is to "Reuse the existing Source connectors built
> > > with FLIP-27 without any change." and I think it is important to honor
> > > that goal given that fixed start positions do not require interface
> > > changes.
> > >
> > > Based on the feedback the following might be a good solution for
> > > runtime position transfer:
> > >
> > > * User supplies the optional converter function (not applicable for
> > > fixed positions).
> > > * Instead of relying on the enumerator checkpoint state [2], the
> > > converter function will be supplied with the current and next
> > > enumerator (source.createEnumerator).
> > > * Converter function relies on the specific enumerator capabilities to
> > > set the new start position (e.g.
> > > fileSourceEnumerator.getEndTimestamp() and
> > > kafkaSourceEnumerator.setTimestampOffsetsInitializer(..)
> > > * HybridSourceSplitEnumerator starts new underlying enumerator
> > >
> > > With this approach, there is no need to augment FLIP-27 interfaces and
> > > custom source capabilities are easier to integrate. Removing the
> > > mandate to rely on enumerator checkpoint state also avoids potential
> > > upgrade/compatibility issues.
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Thomas
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source#FLIP150:IntroduceHybridSource-Prototypeimplementation
> > > [2]
> > >
> >
> https://github.com/apache/flink/pull/15924/files#diff-e07478b3cad9810925ec784b61ec0026396839cc5b27bd6d337a1dea05e999eaR281
> > >
> > >
> > > On Tue, Jun 1, 2021 at 3:10 PM Steven Wu  wrote:
> > > >
> > > > discussed the PR with Thosmas offline. Thomas, please correct me if I
> > > > missed anything.
> > > >
> > > > Right now, the PR differs from the FLIP-150 doc regarding the
> > converter.
> > > > * Current PR uses the enumerator checkpoint state type as the input
> for
> > > the
> > > > converter
> > > > * FLIP-150 defines a new EndStateT inter

[jira] [Created] (FLINK-22914) Use Kafka New Source in Table/SQL connector

2021-06-07 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-22914:
-

 Summary: Use Kafka New Source in Table/SQL connector
 Key: FLINK-22914
 URL: https://issues.apache.org/jira/browse/FLINK-22914
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kafka
Reporter: Qingsheng Ren
 Fix For: 1.14.0


Currently the Kafka Table/SQL connector is still using the legacy Kafka 
SourceFunction. In order to align DataStream and Table/SQL API, the new Kafka 
source should also be used in Table/SQL connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22915) Extend Flink ML API to support Estimator/Transformer DAG

2021-06-07 Thread Dong Lin (Jira)
Dong Lin created FLINK-22915:


 Summary: Extend Flink ML API to support Estimator/Transformer DAG
 Key: FLINK-22915
 URL: https://issues.apache.org/jira/browse/FLINK-22915
 Project: Flink
  Issue Type: Improvement
Reporter: Dong Lin


Currently Flink ML API allows users to compose an Estimator/Transformer from a 
pipeline (i.e. linear sequence) of Estimator/Transformer. We propose to extend 
the Flink ML API so that users can compose an Estimator/Transformer from a 
directed-acyclic-graph (i.e. DAG) of Estimator/Transformer. 

This feature is useful for the following use-cases:

1) The preprocessing workflow (shared between training and inference workflows) 
may involve the join of multiple tables, where the join of two tables can be 
expressed as a Transformer of 2 inputs and 1 output. And the preprocessing 
workflow could also involve the spilt operation, where the split operation has 
1 input (e.g. the original table) and 2 outputs (e.g. the split of the original 
table).

The expression of preprocessing workflow involving the join/split operation 
needs to be expressed as a DAG of Transformer.

2) The graph-embedding algorithm can be expressed as an Estimator, where the 
Estimator takes as input two tables (e.g. a node table and an edge table). The 
corresponding Transformer has 1 input (i.e. the node) and 1 output (i.e. the 
node after embedding)

The expression of training workflow involving the graph-embedding Estimator 
needs to be expressed as a DAG of Transformer/Estimator.







--
This message was sent by Atlassian Jira
(v8.3.4#803005)