[jira] [Created] (FLINK-28341) Fix comment in BytesKeyNormalizationUtil.java

2022-07-01 Thread shen (Jira)
shen created FLINK-28341:


 Summary: Fix comment in BytesKeyNormalizationUtil.java
 Key: FLINK-28341
 URL: https://issues.apache.org/jira/browse/FLINK-28341
 Project: Flink
  Issue Type: Improvement
Reporter: shen


The comment 
[here|https://github.com/apache/flink/blob/release-1.15.1-rc1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/BytesKeyNormalizationUtil.java#L74]
 is not correct since 
[Byte.MIN_VALUE|https://docs.oracle.com/javase/7/docs/api/java/lang/Byte.html#MIN_VALUE]
 = -128, 
[Byte.MAX_VALUE|https://docs.oracle.com/javase/7/docs/api/java/lang/Byte.html#MAX_VALUE]
 = 127.

And I think [code 
below|https://github.com/apache/flink/blob/release-1.15.1-rc1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sort/BytesKeyNormalizationUtil.java#L77-L79]
 can be simplified as:

{code:java}
import org.junit.Assert;
import org.junit.Test;

public class TestIntegerConvertion {
  @Test
  public void testConvertByteInteger() {
for (byte i = Byte.MIN_VALUE; ; ++i) {
  Assert.assertEquals(convertByFlink(i), convertSimplified(i));
  if (i == Byte.MAX_VALUE)
break;
}
  }

  private byte convertByFlink(byte originValue) {
int highByte = originValue & 0xff;
highByte -= Byte.MIN_VALUE;
return (byte)highByte;
  }

  private byte convertSimplified(byte originValue) {
return (byte) (originValue - Byte.MIN_VALUE); // no need to byte and 0xFF.
  }
}

{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28342) Flink batch support for Hive StorageHandlers

2022-07-01 Thread tartarus (Jira)
tartarus created FLINK-28342:


 Summary: Flink batch support for Hive StorageHandlers
 Key: FLINK-28342
 URL: https://issues.apache.org/jira/browse/FLINK-28342
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: tartarus


Hive introduced StorageHandlers when integrating Hbase, we can refer to the 
documentation:

[https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration]

[https://cwiki.apache.org/confluence/display/Hive/StorageHandlers]

Usually a Hive table does not set InputFormat if it uses StorageHandler, but 
currently Flink's MRSplitsGetter does not consider this case. 

When accessing an external Hbase table mapped by Hive using the Flink dialect, 
an NPE is thrown.
{code:java}
2022-07-01 15:03:09,240 ERROR 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Failed to 
create Source Enumerator for source Source: dp_workflow_dag_run_hbase[1]
org.apache.flink.util.FlinkRuntimeException: Could not enumerate file splits
at 
org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:143)
 ~[flink-connector-files-1.15.0.jar:1.15.0]
at 
org.apache.flink.connectors.hive.HiveSource.createEnumerator(HiveSource.java:124)
 ~[flink-sql-connector-hive-2.3.6_2.12-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:197)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:318)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:71)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:196)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.registerAndStartNewCoordinators(DefaultOperatorCoordinatorHandler.java:159)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeOperatorCoordinatorsFor(AdaptiveBatchScheduler.java:295)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.updateTopology(AdaptiveBatchScheduler.java:287)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.initializeVerticesIfPossible(AdaptiveBatchScheduler.java:181)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.startSchedulingInternal(AdaptiveBatchScheduler.java:147)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.SpeculativeScheduler.startSchedulingInternal(SpeculativeScheduler.java:162)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:626)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1092)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:965)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:406) 
~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:181)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:612)
 ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:611)
 ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:185)
 ~[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[flink-rpc-akka_745a2262-f519-4375-8b64-8bf2f805f55a.jar:1.15.0]
at scala.PartialFunction.applyOrElse

[jira] [Created] (FLINK-28343) Hive dialect fails using union map type

2022-07-01 Thread tartarus (Jira)
tartarus created FLINK-28343:


 Summary: Hive dialect fails using union map type
 Key: FLINK-28343
 URL: https://issues.apache.org/jira/browse/FLINK-28343
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: tartarus


We can reproduce it with the following example
{code:java}
@Test
public void testUnionMapType() {
// automatically load hive module in hive-compatible mode
HiveModule hiveModule = new HiveModule(hiveCatalog.getHiveVersion());
CoreModule coreModule = CoreModule.INSTANCE;
for (String loaded : tableEnv.listModules()) {
tableEnv.unloadModule(loaded);
}
tableEnv.loadModule("hive", hiveModule);
tableEnv.loadModule("core", coreModule);
tableEnv.executeSql(
"CREATE TABLE test_map_table (params string) PARTITIONED BY 
(`p_date` string)");
tableEnv.executeSql("select map(\"\",\"\") as params from test_map_table 
union select map(\"\",\"\") as params from test_map_table");
} {code}
Because union semantics need to be de-duplicated, So flink will introduce an 
Aggregate,

An exception will be thrown
{code:java}
Unsupported type(MAP) to generate hash code, the type(MAP) is not supported as 
a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field {code}
We can see the Aggregate operator in the execution plan
{code:java}
optimize subquery_rewrite cost 33 ms.
optimize result: 
LogicalSink(table=[*anonymous_collect$1*], fields=[params])
+- LogicalProject(inputs=[0])
   +- LogicalAggregate(group=[{0}])
      +- LogicalProject(inputs=[0])
         +- LogicalUnion(all=[true])
            :- LogicalProject(exprs=[[map(_UTF-16LE'':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE", _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE")]])
            :  +- LogicalTableScan(table=[[test-catalog, default, 
test_map_table]])
            +- LogicalProject(exprs=[[map(_UTF-16LE'':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE", _UTF-16LE'':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE")]])
               +- LogicalTableScan(table=[[test-catalog, default, 
test_map_table]]) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-244: Support IterativeCondition with Accumulator in CEP Pattern

2022-07-01 Thread Martijn Visser
Hi Mingde Peng,

Thanks for creating the FLIP. I have no strong opinion on this topic, but I
am curious if there are others who would like to chip-in to move this FLIP
forward.

Best regards,

Martijn

Op wo 22 jun. 2022 om 09:24 schreef md peng :

> Hi everyone,
>
> IterativeCondition defines a user-defined condition that decides if an
> element should be accepted in the pattern or not. The condition iterates
> over the previously accepted elements in the pattern and decides to accept
> a new element or not based on some statistic over elements. In certain
> accumulation scenarios, for example filtering goods with more than 1,000
> orders within 10 minutes, accumulation operation needs to perform in
> IterativeCondition. The accumulation behaivor causes the repeated
> calculation of the accumulation state, because an accumulation state may
> execute multiple transitions with condition and each condition invoker will
> be accumulated once.
>
> I would like to start a discussion about FLIP-244[1], in which
> AccumulationStateCondition is proposed to define the IterativeCondition
> with accumulation and filter the accumulation state with accumulator. The
> accumulation state is consistent within the lifecycle of a matching NFA, on
> other words, user doesn't need to pay attention to when the accumulation
> state is initialized and cleaned up.
>
> Please take a look at the FLIP page [1] to get more details. Any feedback
> about the FLIP-244 would be appreciated!
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-244%3A+Support+IterativeCondition+with+Accumulator+in+CEP+Pattern
>
> Best regards,
>
> Mingde Peng
>


Re: Re: Re: [DISCUSS] FLIP-239: Port JDBC Connector Source to FLIP-27

2022-07-01 Thread Dong Weike
Hi,

Thank you for bringing this up, and I am +1 for this feature.

IMO, one important thing that I would like to mention is that an 
improperly-designed FLIP-27 connector could impose very severe memory pressure 
on the JobManager, especially when there are enormous number of splits for the 
source tables, e.g. there are billions of records to read. Frankly speaking, we 
have been haunted by this problem for a long time when using the Flink CDC 
Connectors to read large tables.

Therefore, in order to prevent JobManager from experiencing frequent OOM 
faults, JdbcSourceEnumerator should avoid saving too many JdbcSourceSplits in 
the unassigned list. And it would be better if all the splits would be computed 
on the fly.

Best,
Weike

-邮件原件-
发件人: Lijie Wang  
发送时间: 2022年7月1日 上午 10:25
收件人: dev@flink.apache.org
主题: Re: Re: [DISCUSS] FLIP-239: Port JDBC Connector Source to FLIP-27

Hi Roc,

Thanks for driving the discussion.

Could you describe in detail what the JdbcSourceSplit represents? It looks like 
something wrong with the comments of JdbcSourceSplit in FLIP(it describe as "A 
{@link SourceSplit} that represents a file, or a region of a file").

Best,
Lijie


Roc Marshal  于2022年6月30日周四 21:41写道:

> Hi, Boto.
> Thanks for your reply.
>
>+1 to me on watermark strategy definition in ‘streaming’ & table 
> source. I'm not sure if FLIP-202[1]  is suitable for a separate 
> discussion, but I think your proposal is very helpful to the new 
> source. It would be great if the new source could be compatible with this 
> abstraction.
>
>In addition, whether we need to support such a special bounded 
> scenario abstraction?
>The number of JdbcSourceSplit is certain, but the time to generate 
> all JdbcSourceSplit completely is not certain in the user defined 
> implementation. When the condition that the JdbcSourceSplit 
> generate-process end is met, the JdbcSourceSplit will not be generated.
> After all JdbcSourceSplit processing is completed, the reader will be 
> notified that there are no more JdbcSourceSplit from 
> JdbcSourceSplitEnumerator.
>
> - [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-202%3A+Introduc
> e+ClickHouse+Connector
>
> Best regards,
> Roc Marshal
>
> On 2022/06/30 09:02:23 João Boto wrote:
> > Hi,
> >
> > On source we could improve the JdbcParameterValuesProvider.. to be
> defined as a query(s) or something more dynamic.
> > The most time if your job is dynamic or have some condition to be 
> > met
> (based on data on table) you have to create a connection an get that 
> info from database
> >
> > If we are going to create/allow a "streaming" jdbc source, we should 
> > be
> able to define watermark and get new data from table using that watermark..
> >
> >
> > For the sink (but it could apply on source) will be great to be able 
> > to
> set your implementation of the connection type.. For example if you 
> are connecting to clickhouse, be able to set a implementation based on 
> "BalancedClickhouseDataSource" for example (in this[1] implementation 
> we have a example) or set a extension version of a implementation for 
> debug purpose
> >
> > Regards
> >
> >
> > [1]
> https://github.com/apache/flink/pull/20097/files#diff-8b36e3403381dc14
> c748aeb5de0b4ceb7d7daec39594b1eacff1694b5266419d
> >
> > On 2022/06/27 13:09:51 Roc Marshal wrote:
> > > Hi, all,
> > >
> > >
> > >
> > >
> > > I would like to open a discussion on porting JDBC Source to new 
> > > Source
> API (FLIP-27[1]).
> > >
> > > Martijn Visser, Jing Ge and I had a preliminary discussion on the 
> > > JIRA
> FLINK-25420[2] and planed to start the discussion about the source 
> part first.
> > >
> > >
> > >
> > > Please let me know:
> > >
> > > - The issues about old Jdbc source you encountered;
> > > - The new feature or design you want;
> > > - More suggestions from other dimensions...
> > >
> > >
> > >
> > > You could find more details in FLIP-239[3].
> > >
> > > Looking forward to your feedback.
> > >
> > >
> > >
> > >
> > > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+
> Source+Interface
> > >
> > > [2] https://issues.apache.org/jira/browse/FLINK-25420
> > >
> > > [3]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=21738
> 6271
> > >
> > >
> > >
> > >
> > > Best regards,
> > >
> > > Roc Marshal
> >


[jira] [Created] (FLINK-28344) How to connect flink and elasticsearch in pyflink 1.15.0?

2022-07-01 Thread ela demir (Jira)
ela demir created FLINK-28344:
-

 Summary: How to connect flink and elasticsearch in pyflink 1.15.0?
 Key: FLINK-28344
 URL: https://issues.apache.org/jira/browse/FLINK-28344
 Project: Flink
  Issue Type: Technical Debt
  Components: API / DataSet, API / DataStream, API / Python, Connectors 
/ ElasticSearch, Connectors / Kafka, Table SQL / API
Affects Versions: 1.15.0
 Environment: apache-flink==1.15.0

python==3.8.0

java11

scala 2.11

elasticsearch==7.4.0

kibana==7.4.0
Reporter: ela demir


hello, I'm really interested in Pyflink and aim to create a project related to 
Kafka > Flink > ElasticSearch > Kibana.

I can consume messages from Kafka in Flink but can not find any source to 
connect Flink and ElasticSearch. How can I send kafka messages Flink consumed 
to  ElasticSearch?

My python 3.8 environment includes: apache-flink=1.15.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Support partition pruning for streaming reading

2022-07-01 Thread Martijn Visser
Hi zoucao,

I think this topic deserves a proper FLIP and a vote. This approach is
focussed only on Hive, but I would also like to understand the implications
for FileSource. Can you create one?

Best regards,

Martijn

Op wo 22 jun. 2022 om 18:50 schreef cao zou :

> Hi devs, I want to start a discussion to find a way to support partition
> pruning for streaming reading.
>
>
> Now, Flink has supported the partition pruning, the implementation consists
> of *Source Ability*, *Logical Rule*, and the interface
> *SupportsPartitionPushDown*, but they all only take effect in batch
> reading. When reading a table in streaming mode, the existing mechanism
> will cause some problems posted by FLINK-27898
> [1], and the records
> that should be filtered will be sent downstream.
>
> To solve this drawback, this discussion is proposed, and the Hive and other
> BigData systems stored with partitions will benefit more from it.
>
>  Now, the existing partitions which are needed to consume will be generated
> in *PushPartitionIntoTableSourceScanRule*. Then, the partitions will be
> pushed into TableSource. It’s working well in batch mode, but if we want to
> read records from Hive in streaming mode, and consider the partitions
> committed in the future, it’s not enough.
>
> To support pruning the partitions committed in the feature, the pruning
> function should be pushed into the TableSource, and then delivered to
> *ContinuousPartitionFetcher*, such that the pruning for uncommitted
> partitions can be invoked here.
>
> Before proposing the changes, I think it is necessary to clarify the
> existing pruning logic. The main logic of the pruning in
> *PushPartitionIntoTableSourceScanRule* is as follows.
>
> Firstly, generating a pruning function called partitionPruner, the function
> is extended from a RichMapFunction.
>
>
> if tableSource.listPartitions() is not empty:
>   partitions = dynamicTableSource.listPartitions()
>
>   for p in partitions:
> boolean predicate = partitionPruner.map(convertPartitionToRow(p))
>
> add p to partitionsAfterPruning where the predicate is true.
>
> else  tableSource.listPartitions() is empty:
>   if the filter can be converted to ResolvedExpression &&
> the catalog can support the filter :
>
> partitionsAfterPruning = catalog.listPartitionsByFilter()
>
> the value of partitionsAfterPruning is all needed.
>   else :
>
> partitions = catalog.listPartitions()
> for p in partitions:
> boolean predicate = partitionPruner.map(convertPartitionToRow(p))
>
>  add p to partitionsAfterPruning where the predicate is true.
>
> I think the main logic can be classified into two sides, one exists in the
> logical rule, and the other exists in the connector side. The catalog info
> should be used on the rule side, and not on the connector side, the pruning
> function could be used on both of them or unified on the connector side.
>
>
> Proposed changes
>
>
>- add a new method in SupportsPartitionPushDown
>- let HiveSourceTable, HiveSourceBuilder, and
>HiveContinuousPartitionFetcher hold the pruning function.
>- pruning after fetchPartitions invoked.
>
> Considering the version compatibility and the optimization for the method
> of listing partitions with filter in the catalog, I think we can add a new
> method in *SupportsPartitionPushDown*
>
> /**
> * Provides a list of remaining partitions. After those partitions are
> applied, a source must
> * not read the data of other partitions during runtime.
> *
> * See the documentation of {@link SupportsPartitionPushDown} for more
> information.
> */
> void applyPartitions(List> remainingPartitions);
>
> /**
> * Provides a pruning function for uncommitted partitions.
> */
> default void applyPartitionPuringFunction(MapFunction
> partitionPruningFunction) { }
>
> We can push the generated function into TableSource, such that the
> ContinuousPartitionFetcher can get it.
>
> For Batch reading, the 'remainingPartitions' will be seen as the partitions
> needed to consume, for streaming reading, we use the
> 'partitionPruningFunction' to ignore the unneeded partitions.
> Rejected Alternatives
>
> Do not remove the filter logic in Filter Node about the partition keys, if
> the source will execute streaming reading.
>
>
> Looking forward to your opinions.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-27898
>
> best
>
> zoucao
>


[jira] [Created] (FLINK-28345) Flink Jdbc connector should check batch count before flush

2022-07-01 Thread Feng Jin (Jira)
Feng Jin created FLINK-28345:


 Summary: Flink Jdbc connector should check batch count before flush
 Key: FLINK-28345
 URL: https://issues.apache.org/jira/browse/FLINK-28345
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: 1.14.5, 1.15.0
Reporter: Feng Jin


org.apache.flink.connector.jdbc.internal.JdbcOutputFormat#flush
{code:java}
//代码占位符
@Override
public synchronized void flush() throws IOException {
checkFlushException();

for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
attemptFlush();
batchCount = 0;
break; 
   {code}
When flush the batch,  we should check batchCount  is grater than 0. Other wise 
it would cause some problem with some drivers that do not support empty 
batches, like clickhouse jdbc driver. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28346) Wrong plan when selects metadata in a different order against ddl

2022-07-01 Thread lincoln lee (Jira)
lincoln lee created FLINK-28346:
---

 Summary: Wrong plan when selects metadata in a different order 
against ddl
 Key: FLINK-28346
 URL: https://issues.apache.org/jira/browse/FLINK-28346
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.15.0
Reporter: lincoln lee
 Fix For: 1.16.0


The following query will get a wrong plan:

{code}

@Test
def testReadsMetaDataWithDifferentOrder(): Unit = {
val ddl =
s"""
|CREATE TABLE src (
| id int,
| name varchar,
| tags varchar METADATA VIRTUAL,
| op varchar METADATA VIRTUAL,
| ts timestamp(3) METADATA VIRTUAL
|) WITH (
| 'connector' = 'values',
| 'readable-metadata'='tags:varchar,op:varchar,ts:timestamp(3)',
| 'enable-projection-push-down' = 'false'
|)""".stripMargin
util.tableEnv.executeSql(ddl)

util.verifyExecPlan("SELECT id, name, ts, tags, op FROM src")
}

{code}

 

{code}

Calc(select=[id, name, ts, CAST(tags AS VARCHAR(2147483647)) AS tags, CAST(op 
AS VARCHAR(2147483647)) AS op])
+- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[id, name, op, tags, ts])

{code}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28347) Update testcontainers dependency to v1.17.3

2022-07-01 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-28347:
--

 Summary: Update testcontainers dependency to v1.17.3
 Key: FLINK-28347
 URL: https://issues.apache.org/jira/browse/FLINK-28347
 Project: Flink
  Issue Type: Technical Debt
  Components: Test Infrastructure
Reporter: Martijn Visser
Assignee: Martijn Visser


Changelog: 
https://github.com/testcontainers/testcontainers-java/releases/tag/1.17.3

Main benefits for Flink: Elasticsearch and Pulsar improvements



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-238: Introduce FLIP-27-based Data Generator Source

2022-07-01 Thread Becket Qin
Hi Alex,

In FLIP-27 source, the SourceReader can get a SourceReaderContext. This is
passed in by the TM in Source#createReader(). And supposedly the Source
should pass this to the SourceReader if needed.

In the SourceReaderContext, currently only the index of the current subtask
is available, but we can probably add the current parallelism as well. This
would be a change that affects all the Sources, not only for the data
generator source. Perhaps we can have a simple separate FLIP.

Regarding the semantic of rate limiting, for the rate limit source,
personally I feel intuitive to keep the global rate untouched on scaling.

Thanks,

Jiangjie (Becket) Qin

On Fri, Jul 1, 2022 at 4:00 AM Alexander Fedulov 
wrote:

> Hi all,
>
> getting back to the idea of reusing FlinkConnectorRateLimiter: it is
> designed for the SourceFunction API and has an open() method that takes a
> RuntimeContext. Therefore, we need to add a different interface for
> the new Source
> API.
>
> This is where I see a certain limitation for the rate-limiting use case: in
> the old API the individual readers were able to retrieve the current
> parallelism from the RuntimeContext. In the new API, this is not supported,
> the information about the parallelism is only available in the
> SplitEnumeratorContext to which the readers do not have access.
>
> I see two possibilities:
> 1. Add an optional RateLimiter parameter to the DataGeneratorSource
> constructor. The RateLimiter is then "fixed" and has to be fully configured
> by the user in the main method.
> 2. Piggy-back on Splits: add parallelism as a field of a Split. The
> initialization of this field would happen dynamically upon splits creation
> in the createEnumerator() method where currentParallelism is available.
>
> The second approach makes implementation rather significantly more
> complex since we cannot simply wrap NumberSequenceSource.SplitSerializer in
> that case. The advantage of this approach is that with any kind of
> autoscaling, the source rate will match the original configuration. But I'm
> not sure how useful this is. I can even imagine scenarios where scaling the
> input rate together with parallelism would be better for demo purposes.
>
> Would be glad to hear your thoughts on this.
>
> Best,
> Alexander Fedulov
>
> On Mon, Jun 20, 2022 at 4:31 PM David Anderson 
> wrote:
>
> > I'm very happy with this. +1
> >
> > A lot of SourceFunction implementations used in demos/POC implementations
> > include a call to sleep(), so adding rate limiting is a good idea, in my
> > opinion.
> >
> > Best,
> > David
> >
> > On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren 
> wrote:
> >
> > > Hi Alexander,
> > >
> > > Thanks for creating this FLIP! I’d like to share some thoughts.
> > >
> > > 1. About the “generatorFunction” I’m expecting an initializer on it
> > > because it’s hard to require all fields in the generator function are
> > > serializable in user’s implementation. Providing a function like “open”
> > in
> > > the interface could let the function to make some initializations in
> the
> > > task initializing stage.
> > >
> > > 2. As of the throttling functinality you mentioned, there’s a
> > > FlinkConnectorRateLimiter under flink-core and maybe we could reuse
> this
> > > interface. Actually I prefer to make rate limiting as a common feature
> > > provided in the Source API, but this requires another FLIP and a lot of
> > > discussions so I’m OK to have it in the DataGen source first.
> > >
> > > Best regards,
> > > Qingsheng
> > >
> > >
> > > > On Jun 17, 2022, at 01:47, Alexander Fedulov <
> alexan...@ververica.com>
> > > wrote:
> > > >
> > > > Hi Jing,
> > > >
> > > > thanks for your thorough analysis. I agree with the points you make
> and
> > > > also with the idea to approach the larger task of providing a
> universal
> > > > (DataStream + SQL) data generator base iteratively.
> > > > Regarding the name, the SourceFunction-based *DataGeneratorSource*
> > > resides
> > > > in the *org.apache.flink.streaming.api.functions.source.datagen*. I
> > think
> > > > it is OK to simply place the new one (with the same name) next to the
> > > > *NumberSequenceSource* into
> > *org.apache.flink.api.connector.source.lib*.
> > > >
> > > > One more thing I wanted to discuss:  I noticed that
> *DataGenTableSource
> > > *has
> > > > built-in throttling functionality (*rowsPerSecond*). I believe it is
> > > > something that could be also useful for the DataStream users of the
> > > > stateless data generator and since we want to eventually converge on
> > the
> > > > same implementation for DataStream and Table/SQL it sounds like a
> good
> > > idea
> > > > to add it to the FLIP. What do you think?
> > > >
> > > > Best,
> > > > Alexander Fedulov
> > > >
> > > >
> > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge  wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> After reading all discussions posted in this thread and the source
> > code
> > > of
> > > >> DataGeneratorSource which unfortunate

[jira] [Created] (FLINK-28348) Add configurable flag to disable last-state fallback for savepoint upgrade

2022-07-01 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-28348:
--

 Summary: Add configurable flag to disable last-state fallback for 
savepoint upgrade
 Key: FLINK-28348
 URL: https://issues.apache.org/jira/browse/FLINK-28348
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.1.0


Currently if the job is not running and savepoint upgrade mode is configured, 
the ApplicationReconciler can fall back to last-state upgrade mode if HA was 
enabled.

While in most cases this is fine, in some situation the user might only want to 
allow savepoint ugrades. We should add a flag:



kubernetes.operator.job.upgrade.last-state-fallback.enabled : true

Default should be true to match the current behaviour.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28349) Consider using resource uid as fixed jobid for operator deployed jobs

2022-07-01 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-28349:
--

 Summary: Consider using resource uid as fixed jobid for operator 
deployed jobs
 Key: FLINK-28349
 URL: https://issues.apache.org/jira/browse/FLINK-28349
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora


There are certain connectors that rely on jobid uniqueness to a certain degree 
(such as the iceberg sink connector) so it might make sense to use the 
FlinkDeployment uid as the fixed job id instead of the default .

cc [~matyas] [~wangyang0918] 

If we decide to do so we have to be mindful of the migration path for already 
running jobs with HA/last-state upgrade mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28350) Test last-state upgrades across Flink minor versions

2022-07-01 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-28350:
--

 Summary: Test last-state upgrades across Flink minor versions
 Key: FLINK-28350
 URL: https://issues.apache.org/jira/browse/FLINK-28350
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora


Since the operator relies on the HA metadata for last-state upgrades we should 
make sure that it is possible to upgrade between Flink versions using that.

In some cases due to the HA format changes it might not be possible, if so we 
should force SAVEPOINT upgrade in those cases

cc [~wangyang0918] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-245: Source Supports Speculative Execution For Batch Job

2022-07-01 Thread Jing Zhang
Hi Guowei,
Thanks a lot for your feedback.
Your advices are really helpful.  I've updated the FLIP-245[1] to includes
these parts.
> First of all, please complete the fault-tolerant processing flow in the
FLIP.

After an execution is created and a source operator becomes ready to
receive events,  subtaskReady is called, SpeculativeSourceCoordinator would
store the mapping of SubtaskGateway to execution attempt in
SpeculativeSourceCoordinatorContext.
Then source operator registers the reader to the coordinator,
SpeculativeSourceCoordinator would store the mapping of source reader to
execution attempt in SpeculativeSourceCoordinatorContext.
If the execution goes through a failover, subtaskFailed is called,
SpeculativeSourceCoordinator would clear information about this execution,
including source readers and SubtaskGateway.
If all the current executions of the execution vertex are failed,
subtaskReset would be called, SpeculativeSourceCoordinator would clear all
information about this executions and adding splits back to the split
enumerator of source.

> Secondly the FLIP only says that user-defined events are not supported,
but it does not explain how to deal with the existing
ReportedWatermarkEvent/ReaderRegistrationEvent.

For ReaderRegistrationEvent:
When source operator registers the reader to the coordinator,
SpeculativeSourceCoordinator would also store the mapping of source reader
to execution attempt in SpeculativeSourceCoordinatorContext. Like
SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a
new source reader.
Besides, in order to distinguish source reader between different execution,
'ReaderInfo' need to add 'attemptId' field.

For ReportedWatermarkEvent:
ReportedWatermarkEvent is introduced in 1.15 which is used to support
watermark alignment in streaming mode.
Speculative execution is only enabled in batch mode. Therefore,
SpeculativeSourceCoordinator would thrown an exception if receive a
ReportedWatermarkEvent.

Besides, after offline discussion with Jiangjie (Becket) Qin, I've add
support for SourceEvent because it's useful for some user-defined sources
which have a custom event protocol between reader and enumerator.

Best,
Jing Zhang

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job

Guowei Ma  于2022年6月29日周三 18:06写道:

> Hi, Jing
>
> Thanks a lot for writing this FLIP, which is very useful to Batch users.
> Currently  I have only two small questions:
>
> 1. First of all, please complete the fault-tolerant processing flow in the
> FLIP. (Maybe you've already considered it, but it's better to explicitly
> give the specific solution in the FLIP.)
> For example, how to handle Source `Reader` in case of error. As far as I
> know, once the reader is unavailable, it will result in the inability to
> allocate a new split, which may be unacceptable in the case of speculative
> execution.
>
> 2. Secondly the FLIP only says that user-defined events are not supported,
> but it does not explain how to deal with the existing
> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of
> speculative execution, there may be two "same" tasks being executed at the
> same time. If these events are repeated, whether they really have no effect
> on the execution of the job, there is still a clear evaluation.
>
> Best,
> Guowei
>
>
> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang  wrote:
>
> > Hi all,
> > One major problem of Flink batch jobs is slow tasks running on hot/bad
> > nodes, resulting in very long execution time.
> >
> > In order to solve this problem, FLIP-168: Speculative Execution for Batch
> > Job[1] is introduced and approved recently.
> >
> > Here, Zhu Zhu and I propose to support speculative execution of sources
> as
> > one of follow up of FLIP-168. You could find more details in FLIP-245[2].
> > Looking forward to your feedback.
> >
> > Best,
> > Jing Zhang
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
> >
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
> >
>


Re: [DISCUSS] FLIP-245: Source Supports Speculative Execution For Batch Job

2022-07-01 Thread Jing Zhang
Hi all,
After an offline discussion with Jiangjie (Becket) Qin, Guowei, Zhuzhu,
I've updated the FLIP-245[1] to including:
1. Complete the fault-tolerant processing flow.
2. Support for SourceEvent because it's useful for some user-defined
sources which have a custom event protocol between reader and enumerator.
3. How to handle ReportedWatermarkEvent/ReaderRegistrationEvent messages.

Please review the FLIP-245[1] again, looking forward to your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job

Jing Zhang  于2022年7月1日周五 18:02写道:

> Hi Guowei,
> Thanks a lot for your feedback.
> Your advices are really helpful.  I've updated the FLIP-245[1] to includes
> these parts.
> > First of all, please complete the fault-tolerant processing flow in the
> FLIP.
>
> After an execution is created and a source operator becomes ready to
> receive events,  subtaskReady is called, SpeculativeSourceCoordinator would
> store the mapping of SubtaskGateway to execution attempt in
> SpeculativeSourceCoordinatorContext.
> Then source operator registers the reader to the coordinator,
> SpeculativeSourceCoordinator would store the mapping of source reader to
> execution attempt in SpeculativeSourceCoordinatorContext.
> If the execution goes through a failover, subtaskFailed is called,
> SpeculativeSourceCoordinator would clear information about this execution,
> including source readers and SubtaskGateway.
> If all the current executions of the execution vertex are failed,
> subtaskReset would be called, SpeculativeSourceCoordinator would clear all
> information about this executions and adding splits back to the split
> enumerator of source.
>
> > Secondly the FLIP only says that user-defined events are not supported,
> but it does not explain how to deal with the existing
> ReportedWatermarkEvent/ReaderRegistrationEvent.
>
> For ReaderRegistrationEvent:
> When source operator registers the reader to the coordinator,
> SpeculativeSourceCoordinator would also store the mapping of source reader
> to execution attempt in SpeculativeSourceCoordinatorContext. Like
> SourceCoordinator, it also needs to call SplitEnumerator#addReader to add a
> new source reader.
> Besides, in order to distinguish source reader between different
> execution, 'ReaderInfo' need to add 'attemptId' field.
>
> For ReportedWatermarkEvent:
> ReportedWatermarkEvent is introduced in 1.15 which is used to support
> watermark alignment in streaming mode.
> Speculative execution is only enabled in batch mode. Therefore,
> SpeculativeSourceCoordinator would thrown an exception if receive a
> ReportedWatermarkEvent.
>
> Besides, after offline discussion with Jiangjie (Becket) Qin, I've add
> support for SourceEvent because it's useful for some user-defined sources
> which have a custom event protocol between reader and enumerator.
>
> Best,
> Jing Zhang
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Source+Supports+Speculative+Execution+For+Batch+Job
>
> Guowei Ma  于2022年6月29日周三 18:06写道:
>
>> Hi, Jing
>>
>> Thanks a lot for writing this FLIP, which is very useful to Batch users.
>> Currently  I have only two small questions:
>>
>> 1. First of all, please complete the fault-tolerant processing flow in the
>> FLIP. (Maybe you've already considered it, but it's better to explicitly
>> give the specific solution in the FLIP.)
>> For example, how to handle Source `Reader` in case of error. As far as I
>> know, once the reader is unavailable, it will result in the inability to
>> allocate a new split, which may be unacceptable in the case of speculative
>> execution.
>>
>> 2. Secondly the FLIP only says that user-defined events are not supported,
>> but it does not explain how to deal with the existing
>> ReportedWatermarkEvent/ReaderRegistrationEvent. After all, in the case of
>> speculative execution, there may be two "same" tasks being executed at the
>> same time. If these events are repeated, whether they really have no
>> effect
>> on the execution of the job, there is still a clear evaluation.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Jun 24, 2022 at 5:41 PM Jing Zhang  wrote:
>>
>> > Hi all,
>> > One major problem of Flink batch jobs is slow tasks running on hot/bad
>> > nodes, resulting in very long execution time.
>> >
>> > In order to solve this problem, FLIP-168: Speculative Execution for
>> Batch
>> > Job[1] is introduced and approved recently.
>> >
>> > Here, Zhu Zhu and I propose to support speculative execution of sources
>> as
>> > one of follow up of FLIP-168. You could find more details in
>> FLIP-245[2].
>> > Looking forward to your feedback.
>> >
>> > Best,
>> > Jing Zhang
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job#FLIP168:SpeculativeExecutionforBatchJob-NointegrationwithFlink'swebUI
>> >
>> > [2]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-245%3A+Sou

[jira] [Created] (FLINK-28351) Pulsar Sink should support dynamic generated topic from record

2022-07-01 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-28351:
---

 Summary: Pulsar Sink should support dynamic generated topic from 
record
 Key: FLINK-28351
 URL: https://issues.apache.org/jira/browse/FLINK-28351
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.0
Reporter: Yufan Sheng
 Fix For: 1.16.0


Some people would like to use dynamically-generated topics from messages and 
use the key hash range policy. This is not supported by the Pulsar sink 
currently. We would introduce a new interface named TopicExacter and add a new 
setTopics(TopicExacter) in PulsarSinkBuilder.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28352) [Umbrella] Make Pulsar connector stable

2022-07-01 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-28352:
--

 Summary: [Umbrella] Make Pulsar connector stable
 Key: FLINK-28352
 URL: https://issues.apache.org/jira/browse/FLINK-28352
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.16.0, 1.15.2, 1.14.6
Reporter: Martijn Visser


This ticket is an umbrella ticket to keep track of all currently known Pulsar 
connector test instabilities. These need to be resolved as soon as possible and 
before other new Pulsar features can be added. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28353) Exclude unschedulable nodes using IP addresses of kubernetes nodes

2022-07-01 Thread Suxing Lee (Jira)
Suxing Lee created FLINK-28353:
--

 Summary: Exclude unschedulable nodes using IP addresses of 
kubernetes nodes
 Key: FLINK-28353
 URL: https://issues.apache.org/jira/browse/FLINK-28353
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.14.4, 1.15.0
Reporter: Suxing Lee


when the job is submitted to the k8s cluster and the parameter 
-Dkubernetes.rest-service.exposed.type=NodePort is used, the web ui address of 
the job obtained at this time is the IP of any machine in the k8s cluster.
but client will throw connect refuse exception when the node's schedule status 
is unschedulable. We should exclude those node IPs to choose web ui address



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28354) Support instr and locate bulit-in function in Table API

2022-07-01 Thread LuNing Wang (Jira)
LuNing Wang created FLINK-28354:
---

 Summary: Support instr and locate bulit-in function in Table API
 Key: FLINK-28354
 URL: https://issues.apache.org/jira/browse/FLINK-28354
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Table SQL / API
Affects Versions: 1.15.0
Reporter: LuNing Wang
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28355) Python Bash e2e tests don't clean-up after they've ran, causing disk space issues

2022-07-01 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-28355:
--

 Summary: Python Bash e2e tests don't clean-up after they've ran, 
causing disk space issues
 Key: FLINK-28355
 URL: https://issues.apache.org/jira/browse/FLINK-28355
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Test Infrastructure
Affects Versions: 1.16.0, 1.15.2, 1.14.6
Reporter: Martijn Visser


The Bash based E2E tests that are used in Python aren't cleaned-up after 
they've ran. These cause disk space issues further downstream.

See the CI run from https://github.com/apache/flink/pull/20114 for results, for 
example:

-- When starting with the Bash e2e tests
{code:java}
08:47:10 ##[group]Top 15 biggest directories in terms of used disk space
Jul 01 08:47:12 3983560 .
Jul 01 08:47:12 1266692 ./flink-end-to-end-tests
Jul 01 08:47:12 624568  ./flink-dist
Jul 01 08:47:12 624180  ./flink-dist/target
Jul 01 08:47:12 500076  ./flink-dist/target/flink-1.16-SNAPSHOT-bin
Jul 01 08:47:12 500072  
./flink-dist/target/flink-1.16-SNAPSHOT-bin/flink-1.16-SNAPSHOT
Jul 01 08:47:12 460812  ./flink-connectors
Jul 01 08:47:12 392588  ./.git
Jul 01 08:47:12 366396  ./.git/objects
Jul 01 08:47:12 366388  ./.git/objects/pack
Jul 01 08:47:12 349272  ./flink-table
Jul 01 08:47:12 335592  
./.git/objects/pack/pack-38d46915823ebec2bc660fd160e5cfca5bc3e567.pack
Jul 01 08:47:12 293044  
./flink-dist/target/flink-1.16-SNAPSHOT-bin/flink-1.16-SNAPSHOT/opt
Jul 01 08:47:12 251272  ./flink-filesystems
Jul 01 08:47:12 246596  ./flink-end-to-end-tests/flink-streaming-kinesis-test
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37425&view=logs&j=ef799394-2d67-5ff4-b2e5-410b80c9c0af&t=860bfb5d-81b0-5968-f128-2a8b5362110d&l=664

-- After completing all Bash bashed e2e tests:
{code:java}
2022-07-01T10:20:17.3594718Z Jul 01 10:20:17 ##[group]Top 15 biggest 
directories in terms of used disk space
2022-07-01T10:20:18.7520631Z Jul 01 10:20:18 5425892.
2022-07-01T10:20:18.7521823Z Jul 01 10:20:18 1521472./flink-end-to-end-tests
2022-07-01T10:20:18.7522566Z Jul 01 10:20:18 1242528./flink-python
2022-07-01T10:20:18.7523244Z Jul 01 10:20:18 952336 ./flink-python/dev
2022-07-01T10:20:18.7524159Z Jul 01 10:20:18 878764 
./flink-python/dev/.conda
2022-07-01T10:20:18.7524870Z Jul 01 10:20:18 834200 
./flink-python/dev/.conda/lib
2022-07-01T10:20:18.7525619Z Jul 01 10:20:18 726528 
./flink-python/dev/.conda/lib/python3.7
2022-07-01T10:20:18.7526397Z Jul 01 10:20:18 683256 
./flink-python/dev/.conda/lib/python3.7/site-packages
2022-07-01T10:20:18.7527101Z Jul 01 10:20:18 624568 ./flink-dist
2022-07-01T10:20:18.7527768Z Jul 01 10:20:18 624180 ./flink-dist/target
2022-07-01T10:20:18.7528494Z Jul 01 10:20:18 500076 
./flink-dist/target/flink-1.16-SNAPSHOT-bin
2022-07-01T10:20:18.7529298Z Jul 01 10:20:18 500072 
./flink-dist/target/flink-1.16-SNAPSHOT-bin/flink-1.16-SNAPSHOT
2022-07-01T10:20:18.7530046Z Jul 01 10:20:18 460812 ./flink-connectors
2022-07-01T10:20:18.7530546Z Jul 01 10:20:18 392588 ./.git
2022-07-01T10:20:18.7531014Z Jul 01 10:20:18 366396 ./.git/objects
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=37425&view=logs&j=ef799394-2d67-5ff4-b2e5-410b80c9c0af&t=860bfb5d-81b0-5968-f128-2a8b5362110d&l=9631



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28356) Move operator metric recording logic into statusrecorder

2022-07-01 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-28356:
--

 Summary: Move operator metric recording logic into statusrecorder 
 Key: FLINK-28356
 URL: https://issues.apache.org/jira/browse/FLINK-28356
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.1.0


Currently status and metric reporting happen independently which can cause some 
inconsistency in the metrics . We should move the metrics inside the status 
recorder 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28357) Watermark issue when recovering Finished sources

2022-07-01 Thread James (Jira)
James created FLINK-28357:
-

 Summary: Watermark issue when recovering Finished sources
 Key: FLINK-28357
 URL: https://issues.apache.org/jira/browse/FLINK-28357
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.15.0
 Environment: This can be reproduced in an IDE with the attached sample 
program.
Reporter: James
 Attachments: WatermarkDemoMain.java, 
image-2022-07-01-16-18-14-768.png, longExample.txt

Copied mostly from email trail on the flink user mailing list:

I done a lot of experimentation and I’m convinced there is a problem with Flink 
handling Finished sources and recovery. 

The program consists of:
 * Two sources:
 ** One “Long Running Source” – stays alive and emits a watermark of 
DateTime.now() every 10 seconds.
 *** Prints the console a message saying the watermark has been emitted.
 *** *Throws an exception every 5 or 10 iterations to force a recovery.*
 ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
message to the console and returns.
 * The “Short Live Source” feeds into a map() and then it joins with the “Long 
Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” state by 
Flink.

The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
the map() in some situations after a recovery. The dashboard goes from showing 
this:

!https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488ApJh59ltR0SkB4Wz2JhnvgAFXJ9puQESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q&X-OWA-CANARY=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.&owa=outlook.live.com&scriptVer=20220617005.11&animation=true!

To the below after a recovery (with the currentInput1/2Watermark metrics 
showing input 2 having not received a watermark from the map, saying 
–Long.MAX_VALUE):

!image-2022-07-01-16-18-14-768.png!

The program is currently set to checkpoint every 5 seconds. By experimenting 
with 70 seconds, it seems that if only one checkpoint has been taken with the 
“Short Lived Source” in a FINISHED state since the last recovery then 
everything works fine and the restarted “Short Lived Source” emits its 
watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
watermark” message on the console meaning the run() definitely executed. 
However, I found that if 2 or more checkpoints are taken since the last 
recovery with the source in a FINISHED state then the console message does not 
appear and the watermark is not emitted.

To repeat – the Join does not get a Long.MAX_VALUE watermark from my source or 
Flink if I see two or more checkpoints logged in between recoveries. If zero or 
checkpoints are made, everything is fine – the join gets the watermark and I 
see my console message. You can play with the checkpointing frequency as per 
the code comments:

    // Useful checkpoint interval options:

    //    5 - see the problem after the first recovery

    //   70 - useful to see bad behaviour kick in after a recovery or two

    //  120 - won't see the problem as we don't have 2 checkpoints within a 
single recovery session

If I merge the Triggering/Completed checkpoint messages in the log with my 
console output I see something like this clearly showing the “Short Lived 
Source” run() method is not executed after 2 checkpoints with the operators 
marked as FINISHED:

 

Re: [VOTE] Release 1.15.1, release candidate #1

2022-07-01 Thread Konstantin Knauf
Hi everyone,

I would like to raise the question if we should abort this release
candidate at the current stage because of newly found bugs?

* The minimal voting period had already passed and with one addition
binding vote (e.g. mine), we could release this immediately and ship all
the other fixes to users.

* The way I see the process at this stage of the release, we have already
decided that we want to release now (otherwise we wouldn't have prepared a
candidate) and this is only about sanity checking and validating the
release candidate. Regressions - in this case between 1.15.0 and 1.15.1 -
would usually justify aborting the release candidate, but as far as I can
see neither FLINK-23528 and FLINK-28322 are regressions.

@Danny: I don't understand how FINK-23528 is a blocker for this release
candidate, sorry. It's been open for almost a year. Of course, it should
then go into the next patch release of Flink 1.15.

@Jingsong: Please re-consider your -1 vote.

Thanks,

Konstantin


Am Do., 30. Juni 2022 um 18:21 Uhr schrieb Danny Cranmer <
dannycran...@apache.org>:

> Hello all,
>
> -1
>
> The Kinesis Data Streams consumer does not work with Stop With Savepoint
> [1]. We are planning to have a fix ready to merge tomorrow and would
> appreciate getting this in 1.15.1.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23528
>
> Thanks,
> Danny
>
> On Thu, Jun 30, 2022 at 9:31 AM Jingsong Li 
> wrote:
>
> > Hi David, Thanks for creating this RC.
> >
> > -1
> >
> > We found an incompatible modification in 1.15.0 [1]
> > I think we should fix it.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-28322
> >
> > Best,
> > Jingsong
> >
> > On Tue, Jun 28, 2022 at 8:45 PM Robert Metzger 
> > wrote:
> > >
> > > +1 (binding)
> > >
> > > - staging repo contents look fine
> > > - KEYS file ok
> > > - binaries start locally properly. WebUI accessible on Mac.
> > >
> > > On Mon, Jun 27, 2022 at 11:21 AM Qingsheng Ren 
> wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > - checked/verified signatures and hashes
> > > > - checked that all POM files point to the same version
> > > > - built from source, without Hadoop and using Scala 2.12
> > > > - started standalone cluster locally, WebUI is accessiable and ran
> > > > WordCount example successfully
> > > > - executed a job with SQL client consuming from Kafka source to
> collect
> > > > sink
> > > >
> > > > Best,
> > > > Qingsheng
> > > >
> > > >
> > > > > On Jun 27, 2022, at 14:46, Xingbo Huang 
> wrote:
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > - verify signatures and checksums
> > > > > - no binaries found in source archive
> > > > > - build from source
> > > > > - Reviewed the release note blog
> > > > > - verify python wheel package contents
> > > > > - pip install apache-flink-libraries and apache-flink wheel
> packages
> > > > > - run the examples from Python Table API tutorial
> > > > >
> > > > > Best,
> > > > > Xingbo
> > > > >
> > > > > Chesnay Schepler  于2022年6月24日周五 21:42写道:
> > > > >
> > > > >> +1 (binding)
> > > > >>
> > > > >> - signatures OK
> > > > >> - all required artifacts appear to be present
> > > > >> - tag exists with the correct version adjustments
> > > > >> - binary shows correct commit and version
> > > > >> - examples run fine
> > > > >> - website PR looks good
> > > > >>
> > > > >> On 22/06/2022 14:20, David Anderson wrote:
> > > > >>> Hi everyone,
> > > > >>>
> > > > >>> Please review and vote on release candidate #1 for version
> 1.15.1,
> > as
> > > > >>> follows:
> > > > >>> [ ] +1, Approve the release
> > > > >>> [ ] -1, Do not approve the release (please provide specific
> > comments)
> > > > >>>
> > > > >>> The complete staging area is available for your review, which
> > includes:
> > > > >>>
> > > > >>> * JIRA release notes [1],
> > > > >>> * the official Apache source release and binary convenience
> > releases to
> > > > >> be
> > > > >>> deployed to dist.apache.org [2], which are signed with the key
> > with
> > > > >>> fingerprint E982F098 [3],
> > > > >>> * all artifacts to be deployed to the Maven Central Repository
> [4],
> > > > >>> * source code tag "release-1.15.1-rc1" [5],
> > > > >>> * website pull request listing the new release and adding
> > announcement
> > > > >> blog
> > > > >>> post [6].
> > > > >>>
> > > > >>> The vote will be open for at least 72 hours. It is adopted by
> > majority
> > > > >>> approval, with at least 3 PMC affirmative votes.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> David
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=
> > > > >>> 12351546
> > > > >>> [2]
> https://dist.apache.org/repos/dist/dev/flink/flink-1.15.1-rc1/
> > > > >>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > >>> [4]
> > > > >>
> > https://repository.apache.org/content/repositories/orgapacheflink-1511/
> > > > >>> [5] https://github.com/apache/flink/tree/release-1.15.1-rc1
> > > > >

[jira] [Created] (FLINK-28358) when debug in local ,throw out "The system time period specification expects Timestamp type but is 'TIMESTAMP_WITH_LOCAL_TIME_ZONE' " exception

2022-07-01 Thread PengfeiChang (Jira)
PengfeiChang created FLINK-28358:


 Summary: when debug in local ,throw out "The system time period 
specification expects Timestamp type but is 'TIMESTAMP_WITH_LOCAL_TIME_ZONE' " 
exception
 Key: FLINK-28358
 URL: https://issues.apache.org/jira/browse/FLINK-28358
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.14.4, 1.14.3, 1.14.2, 1.14.0
 Environment: maven:3.2.5

openjdk:1.8.0_333

idea:IntelliJ IDEA 2021.3 (Ultimate Edition)
Reporter: PengfeiChang
 Fix For: 1.14.4, 1.14.3, 1.14.2, 1.14.0


h1. subject

when i debug in local to see the jdbcconnector lookup mechanism and run 
apache.flink.connector.jdbc.table.JdbcLookupTableITCase.testLookup,throw out a 
exception ,detail as follow:

 
{code:java}
org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 1, column 106 to line 1, column 120: The system time period specification 
expects Timestamp type but is 'TIMESTAMP_WITH_LOCAL_TIME_ZONE'

at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at 
org.apache.flink.connector.jdbc.table.JdbcLookupTableITCase.useDynamicTableFactory(JdbcLookupTableITCase.java:195)
at 
org.apache.flink.connector.jdbc.table.JdbcLookupTableITCase.testLookup(JdbcLookupTableITCase.java:81)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)

RE: Re: Re: Re: [DISCUSS] FLIP-239: Port JDBC Connector Source to FLIP-27

2022-07-01 Thread Roc Marshal
Hi, Weike.

Thank you for your reply
As you said, too many splits stored in SourceEnumerator will increase the load 
of JM.
What do you think if we introduce a capacity of splits in SourceEnumerator to 
limit the total number, and introduce a reject or callback mechanism with too 
many splits in the timely generation strategy to solve this problem? 
Looking forward to a better solution .

Best regards,
Roc Marshal

On 2022/07/01 07:58:22 Dong Weike wrote:
> Hi,
> 
> Thank you for bringing this up, and I am +1 for this feature.
> 
> IMO, one important thing that I would like to mention is that an 
> improperly-designed FLIP-27 connector could impose very severe memory 
> pressure on the JobManager, especially when there are enormous number of 
> splits for the source tables, e.g. there are billions of records to read. 
> Frankly speaking, we have been haunted by this problem for a long time when 
> using the Flink CDC Connectors to read large tables.
> 
> Therefore, in order to prevent JobManager from experiencing frequent OOM 
> faults, JdbcSourceEnumerator should avoid saving too many JdbcSourceSplits in 
> the unassigned list. And it would be better if all the splits would be 
> computed on the fly.
> 
> Best,
> Weike
> 
> -邮件原件-
> 发件人: Lijie Wang  
> 发送时间: 2022年7月1日 上午 10:25
> 收件人: dev@flink.apache.org
> 主题: Re: Re: [DISCUSS] FLIP-239: Port JDBC Connector Source to FLIP-27
> 
> Hi Roc,
> 
> Thanks for driving the discussion.
> 
> Could you describe in detail what the JdbcSourceSplit represents? It looks 
> like something wrong with the comments of JdbcSourceSplit in FLIP(it describe 
> as "A {@link SourceSplit} that represents a file, or a region of a file").
> 
> Best,
> Lijie
> 
> 
> Roc Marshal  于2022年6月30日周四 21:41写道:
> 
> > Hi, Boto.
> > Thanks for your reply.
> >
> >+1 to me on watermark strategy definition in ‘streaming’ & table 
> > source. I'm not sure if FLIP-202[1]  is suitable for a separate 
> > discussion, but I think your proposal is very helpful to the new 
> > source. It would be great if the new source could be compatible with this 
> > abstraction.
> >
> >In addition, whether we need to support such a special bounded 
> > scenario abstraction?
> >The number of JdbcSourceSplit is certain, but the time to generate 
> > all JdbcSourceSplit completely is not certain in the user defined 
> > implementation. When the condition that the JdbcSourceSplit 
> > generate-process end is met, the JdbcSourceSplit will not be generated.
> > After all JdbcSourceSplit processing is completed, the reader will be 
> > notified that there are no more JdbcSourceSplit from 
> > JdbcSourceSplitEnumerator.
> >
> > - [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-202%3A+Introduc
> > e+ClickHouse+Connector
> >
> > Best regards,
> > Roc Marshal
> >
> > On 2022/06/30 09:02:23 João Boto wrote:
> > > Hi,
> > >
> > > On source we could improve the JdbcParameterValuesProvider.. to be
> > defined as a query(s) or something more dynamic.
> > > The most time if your job is dynamic or have some condition to be 
> > > met
> > (based on data on table) you have to create a connection an get that 
> > info from database
> > >
> > > If we are going to create/allow a "streaming" jdbc source, we should 
> > > be
> > able to define watermark and get new data from table using that watermark..
> > >
> > >
> > > For the sink (but it could apply on source) will be great to be able 
> > > to
> > set your implementation of the connection type.. For example if you 
> > are connecting to clickhouse, be able to set a implementation based on 
> > "BalancedClickhouseDataSource" for example (in this[1] implementation 
> > we have a example) or set a extension version of a implementation for 
> > debug purpose
> > >
> > > Regards
> > >
> > >
> > > [1]
> > https://github.com/apache/flink/pull/20097/files#diff-8b36e3403381dc14
> > c748aeb5de0b4ceb7d7daec39594b1eacff1694b5266419d
> > >
> > > On 2022/06/27 13:09:51 Roc Marshal wrote:
> > > > Hi, all,
> > > >
> > > >
> > > >
> > > >
> > > > I would like to open a discussion on porting JDBC Source to new 
> > > > Source
> > API (FLIP-27[1]).
> > > >
> > > > Martijn Visser, Jing Ge and I had a preliminary discussion on the 
> > > > JIRA
> > FLINK-25420[2] and planed to start the discussion about the source 
> > part first.
> > > >
> > > >
> > > >
> > > > Please let me know:
> > > >
> > > > - The issues about old Jdbc source you encountered;
> > > > - The new feature or design you want;
> > > > - More suggestions from other dimensions...
> > > >
> > > >
> > > >
> > > > You could find more details in FLIP-239[3].
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > >
> > > >
> > > >
> > > > [1]
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+
> > Source+Interface
> > > >
> > > > [2] https://issues.apache.org/jira/browse/FLINK-25420
> > > >
> > > > [3]
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=21