[jira] [Created] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo

2020-01-13 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15566:
--

 Summary: Flink implicitly order the fields in PojoTypeInfo
 Key: FLINK-15566
 URL: https://issues.apache.org/jira/browse/FLINK-15566
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Affects Versions: 1.10.0
Reporter: Jeff Zhang
 Attachments: image-2020-01-13-16-02-57-949.png

I don't know why flink would do that, but this cause my user defined function 
behavior incorrectly if I and pojo in my udf and override getResultType

[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]

 

Here's the udf I define.

 
{code:java}

%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._

class Person(val age:Int, val job: String, val marital: String, val education: 
String, val default: String, val balance: String, val housing: String, val 
loan: String, val contact: String, val day: String, val month: String, val 
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val 
poutcome: String, val y: String)

class ParseFunction extends TableFunction[Person] {
  def eval(line: String) {
val tokens = line.split(";")
// parse the line
if (!line.startsWith("\"age\"")) {
  collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
   new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16
}
  }
  
  override def getResultType() = {
val cls = classOf[Person]
new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
   new PojoField(cls.getDeclaredField("age"), Types.INT),
   new PojoField(cls.getDeclaredField("job"), Types.STRING),
   new PojoField(cls.getDeclaredField("marital"), Types.STRING),
   new PojoField(cls.getDeclaredField("education"), Types.STRING),
   new PojoField(cls.getDeclaredField("default"), Types.STRING),
   new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
   new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
   new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
   new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
   new PojoField(cls.getDeclaredField("day"), Types.STRING), 
   new PojoField(cls.getDeclaredField("month"), Types.STRING), 
   new PojoField(cls.getDeclaredField("duration"), Types.INT),
   new PojoField(cls.getDeclaredField("campaign"), Types.INT),
   new PojoField(cls.getDeclaredField("pdays"), Types.INT),
   new PojoField(cls.getDeclaredField("previous"), Types.INT),
   new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
   new PojoField(cls.getDeclaredField("y"), Types.STRING)
 ))
  }  

  // remove the quote
  private def normalize(token: String) = {
  if (token.startsWith("\"")) {
  token.substring(1, token.length - 1)
  } else {
  token
  }
  }
}{code}

And then I use this udf in sql but get the wrong result because the flink 
reorder the fields implicitly.

 !image-2020-01-13-16-02-57-949.png! 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15567) Add documentation for INSERT statements for Flink SQL

2020-01-13 Thread Jark Wu (Jira)
Jark Wu created FLINK-15567:
---

 Summary: Add documentation for INSERT statements for Flink SQL
 Key: FLINK-15567
 URL: https://issues.apache.org/jira/browse/FLINK-15567
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Jark Wu
Assignee: Jark Wu
 Fix For: 1.10.0


We missed to add documentation for INSERT statements which should be added 
under "SQL" page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15568) RestartPipelinedRegionStrategy: not ensure the EXACTLY_ONCE semantics

2020-01-13 Thread Andrew.D.lin (Jira)
Andrew.D.lin created FLINK-15568:


 Summary: RestartPipelinedRegionStrategy: not ensure the 
EXACTLY_ONCE semantics
 Key: FLINK-15568
 URL: https://issues.apache.org/jira/browse/FLINK-15568
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.8.3, 1.8.1, 1.8.0
Reporter: Andrew.D.lin
 Attachments: image-2020-01-13-16-40-47-888.png

!image-2020-01-13-16-40-47-888.png!

 

In 1.8* versions, FailoverRegion.java restart method  not restore from latest 
checkpoint and marked TODO.

Should we support this feature (region restart) in flink 1.8?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15569) Incorrect sample code in udf document

2020-01-13 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-15569:
--

 Summary: Incorrect sample code in udf document
 Key: FLINK-15569
 URL: https://issues.apache.org/jira/browse/FLINK-15569
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Jeff Zhang
 Attachments: image-2020-01-13-16-59-00-022.png

Should use JTuple2 instead of JTuple1

 !image-2020-01-13-16-59-00-022.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15570) Support inserting into non-empty csv tables

2020-01-13 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-15570:
---

 Summary: Support inserting into non-empty csv tables
 Key: FLINK-15570
 URL: https://issues.apache.org/jira/browse/FLINK-15570
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem, Formats (JSON, Avro, Parquet, 
ORC, SequenceFile)
Reporter: Caizhi Weng


Currently we only support inserting into empty csv tables. It would be nice for 
a new user from traditional database to try out Flink if we support inserting 
into non-empty csv tables.

In other words, it would be nice if the following SQL produces a valid result:
{code:java}
CREATE TABLE myTable(
a INT,
b DOUBLE
) WITH (
'connector.type' = 'filesystem',
'connector.path' = an existing csv file,
'format.type' = 'csv',
'format.derive-schema' = 'true'
);

SELECT * FROM myTable; // produces 3 rows

INSERT INTO myTable VALUES (4, 4.4);

SELECT * FROM myTable; // produces 4 rows{code}
We currently only have two write modes, namely NO_OVERWRITE and OVERWRITE. In 
NO_OVERWRITE mode we can only insert into empty csv tables, while in OVERWRITE 
mode inserting into a csv table will wipe all existing data in it (in the above 
example, the last SELECT will produce only 1 instead of 4 rows) which is really 
strange for a mere INSERT operation. We need to add a new APPEND write mode, or 
change the behavior for OVERWRITE mode to something like appending to files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15571) Create a Redis Streams Connector for Flink

2020-01-13 Thread Tugdual Grall (Jira)
Tugdual Grall created FLINK-15571:
-

 Summary: Create a Redis Streams Connector for Flink
 Key: FLINK-15571
 URL: https://issues.apache.org/jira/browse/FLINK-15571
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Tugdual Grall


Redis has a "log data structure" called Redis Streams, it would be nice to 
integrate Redis Streams and Apache Flink as:
 * Source
 * Sink

See Redis Streams introduction: [https://redis.io/topics/streams-intro]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink

2020-01-13 Thread Piotr Nowojski
Hi,

I was checking the official list of FLIP’s [1] and just took the “Next FLIP 
Number: 93” (which was 92 at that time), that’s why I didn’t notice your 
mailing list thread from the same day.

Please next time first claim the number there and update the wiki. First edit 
[1] by reserving the number and bumping the next free one and only after that 
starting the mailing list thread. Otherwise there can be race conditions like 
this :/

Piotrek

[1] 
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals 


> On 10 Jan 2020, at 01:40, Benchao Li  wrote:
> 
> Hi Piotr,
> 
> It seems that we have the 'FLIP-92' already.
> see:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> 
> 
> Piotr Nowojski  于2020年1月9日周四 下午11:25写道:
> 
>> Hi,
>> 
>> I would like to start a vote for adding the N-Ary Stream Operator in Flink
>> as discussed in the discussion thread [1].
>> 
>> This vote will be opened at least until Wednesday, January 15th 8:00 UTC.
>> 
>> Piotrek
>> 
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
>> <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
>>> 
> 
> 
> 
> -- 
> 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn



[jira] [Created] (FLINK-15572) Function DDL is not compliant with FLIP-64

2020-01-13 Thread Timo Walther (Jira)
Timo Walther created FLINK-15572:


 Summary: Function DDL is not compliant with FLIP-64
 Key: FLINK-15572
 URL: https://issues.apache.org/jira/browse/FLINK-15572
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Timo Walther


The Function DDL does not follow the path resolution defined in FLIP-64. It 
always assumes fully qualified paths for catalog and database instead of 
resolving partially defined paths with the help of the current 
catalog/database. See {{createTemporaryView()}} as an example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[ANNOUNCE] Flink Forward San Francisco 2020 Call for Presentation extended!

2020-01-13 Thread Fabian Hueske
Hi everyone,

We know some of you only came back from holidays last week.
To give you more time to submit a talk, we decided to extend the Call for
Presentations for Flink Forward San Francisco 2020 until Sunday January
19th.

The conference takes place on March 23-25 with two days of talks and one
day of training.
If you are working on an interesting Flink project or use case that you'd
like to share with many enthusiastic Flink users and committer, you should
submit a talk proposal.

We are looking for talks on the following topics:
* Use Cases
* Operations
* Technology Deep Dive
* Ecosystem
* Community

You can find more detailed track descriptions and the form to submit a
proposal on the Flink Forward website at

--> https://www.flink-forward.org/sf-2020/call-for-presentations

As usual, accepted speakers get a free conference pass (incl. training
sessions).

Best regards,
Fabian
(PC Chair for Flink Forward SF 2020)


Re:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink

2020-01-13 Thread Haibo Sun
+1 (non-binding)


Best,
Haibo

At 2020-01-13 11:36:12, "Yun Gao"  wrote:
>+1 (non-binding).
>
>Very thanks for introducing this topic back, and it should be able to bring 
>improvements in the discussed scenarios. 
>
>Best,
>Yun
>
>
>--
>From:Arvid Heise 
>Send Time:2020 Jan. 10 (Fri.) 16:48
>To:dev ; Zhijiang 
>Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink
>
>non-binding +1
>
>On Fri, Jan 10, 2020 at 9:11 AM Zhijiang 
>wrote:
>
>> +1, it is really nice to have the N-Ary stream operator which is
>> meaningful in some scenarios.
>>
>> best,
>> Zhijiang
>>
>>
>> --
>> From:Jingsong Li 
>> Send Time:2020 Jan. 10 (Fri.) 11:00
>> To:dev 
>> Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink
>>
>> +1 non-binding to the N-Ary Stream Operator. Thanks Piotr for driving.
>> Looks like the previous FLIP-92 did not change the "Next FLIP Number" in
>> FLIP page.
>>
>> Best,
>> Jingsong Lee
>>
>> On Fri, Jan 10, 2020 at 8:40 AM Benchao Li  wrote:
>>
>> > Hi Piotr,
>> >
>> > It seems that we have the 'FLIP-92' already.
>> > see:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
>> >
>> >
>> > Piotr Nowojski  于2020年1月9日周四 下午11:25写道:
>> >
>> > > Hi,
>> > >
>> > > I would like to start a vote for adding the N-Ary Stream Operator in
>> > Flink
>> > > as discussed in the discussion thread [1].
>> > >
>> > > This vote will be opened at least until Wednesday, January 15th 8:00
>> UTC.
>> > >
>> > > Piotrek
>> > >
>> > > [1]
>> > >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
>> > > <
>> > >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
>> > > >
>> >
>> >
>> >
>> > --
>> >
>> > Benchao Li
>> > School of Electronics Engineering and Computer Science, Peking University
>> > Tel:+86-15650713730
>> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
>> >
>>
>>
>> --
>> Best, Jingsong Lee
>>
>>


Re: [DISCUSS] Integrate Flink Docker image publication into Flink release process

2020-01-13 Thread Ufuk Celebi
Hey all,

first of all a big thank you for driving many of the Docker image releases
in the last two years.

*(1) Moving docker-flink/docker-flink to apache/docker-flink*

+1 to do this as you outlined. I would propose to aim for a first
integration with the 1.10 release without major changes to the existing
Dockerfiles. The work items would be to move the Dockerfiles and update the
release process documentation so everyone is on the same page.

*(2) Consolidate Dockerfiles in apache/flink*

+1 to start the process for this. I think this requires a bit of thinking
about what the requirements are and which problems we want to solve. From
skimming the existing Dockerfiles, it seems to me that the Docker image
builds fulfil quite a few different tasks. We have a script that can bundle
Hadoop, can copy an existing Flink distribution, can include user jars,
etc. The scope of this is quite broad and would warrant a design document/a
FLIP.

I would move the questions about nightly builds, using a different base
image or having image variants with debug tooling to after (1) and (2) or
make it part of (2).

*(3) Next steps*

If there are no objections, I would propose to tackle (1) and (2) separate
and to continue as follows:

(i) Create tickets for (1) and aim to align with 1.10 release timeline
(ideally before the first RC). Since this does not touch any code in the
release branches, I think this would not be affected by the feature freeze.
The major work item would be to update the docs and potential refactorings
of the existing process and Dockerfiles. I can help with the process to
create a new repo.

(ii) Create first draft for consolidation of existing Dockerfiles. After
this proposal is done, I would propose to bring it up for a separate
discussion on the ML.


What do you think? @Patrick: would you be interested in working on both (1)
+ (2) or did you mainly have (1) in mind?

Best,

Ufuk

On Sun, Jan 12, 2020 at 8:30 PM Konstantin Knauf 
wrote:

> Big +1 for
>
> * official images in a separate repository
> * unified images (session cluster vs application cluster)
> * images for development in Apache flink repository
>
> On Fri, Jan 10, 2020 at 7:14 PM Till Rohrmann 
> wrote:
>
> > Thanks a lot for starting this discussion Patrick! I think it is a very
> > good idea to move Flink's docker image more under the jurisdiction of the
> > Flink PMC and to make it releasing new docker images part of Flink's
> > release process (not saying that we cannot release new docker images
> > independent of Flink's release cycle).
> >
> > One thing I have no strong opinion about is where to place the
> Dockerfiles
> > (apache/flink.git vs. apache/flink-docker.git). I see the point that one
> > wants to separate concerns (Flink code vs. Dockerfiles) and, hence, that
> > having separate repositories might help with this objective. But on the
> > other hand, I don't have a lot of experience with Docker Hub and how to
> > best host Dockerfiles. Consequently, it would be helpful if others who
> have
> > made some experience could share it with us.
> >
> > Cheers,
> > Till
> >
> > On Sat, Dec 21, 2019 at 2:28 PM Hequn Cheng 
> wrote:
> >
> > > Hi Patrick,
> > >
> > > Thanks a lot for your continued work on the Docker images. That’s
> really
> > > really a great job! And I have also benefited from it.
> > >
> > > Big +1 for integrating docker image publication into the Flink release
> > > process since we can leverage the Flink release process to make sure a
> > more
> > > legitimacy docker publication. We can also check and vote on it during
> > the
> > > release.
> > >
> > > I think the most import thing we need to discuss first is whether to
> > have a
> > > dedicated git repo for the Dockerfiles.
> > >
> > > Although it is convention shared by nearly every other “official” image
> > on
> > > Docker Hub to have a dedicated repo, I'm still not sure about it.
> Maybe I
> > > have missed something important. From my point of view, I think it’s
> > better
> > > to have the Dockerfiles in the (main)Flink repo.
> > >   - First, I think the Dockerfiles can be treated as part of the
> release.
> > > And it is also natural to put the corresponding version of the
> Dockerfile
> > > in the corresponding Flink release.
> > >   - Second, we can put the Dockerfiles in the path like
> > > flink/docker-flink/version/ and the version varies in different
> releases.
> > > For example, for release 1.8.3, we have a flink/docker-flink/1.8.3
> > > folder(or maybe flink/docker-flink/1.8). Even though all Dockerfiles
> for
> > > supported versions are not in one path but they are still in one Git
> tree
> > > with different refs.
> > >   - Third, it seems the Docker Hub also supports specifying different
> > refs.
> > > For the file[1], we can change the GitRepo link from
> > > https://github.com/docker-flink/docker-flink.git to
> > > https://github.com/apache/flink.git and add a GitFetch for each tag,
> > e.g.,
> > > GitFetch: refs/tags/release-1.8.

[jira] [Created] (FLINK-15573) Let Flink SQL PlannerExpressionParserImpl#FieldRefrence use Unicode as its default charset

2020-01-13 Thread Lsw_aka_laplace (Jira)
Lsw_aka_laplace created FLINK-15573:
---

 Summary: Let Flink SQL PlannerExpressionParserImpl#FieldRefrence 
use Unicode  as its default charset  
 Key: FLINK-15573
 URL: https://issues.apache.org/jira/browse/FLINK-15573
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: Lsw_aka_laplace


Now I am talking about the `PlannerExpressionParserImpl`

    For now  the fieldRefrence‘s  charset is JavaIdentifier,why not change it 
to UnicodeIdentifier?

    Currently in my team, we do actually have this problem. For instance, data 
from Es always contains `@timestamp` field , which can not meet JavaIdentifier. 
So what we did is just let the fieldRefrence Charset use Unicode

 
{code:scala}
 lazy val extensionIdent: Parser[String] = ( "" ~> // handle whitespace 
rep1(acceptIf(Character.isUnicodeIdentifierStart)("identifier expected but '" + 
_ + "' found"), elem("identifier part", Character.isUnicodeIdentifierPart(: 
Char))) ^^ (.mkString) ) 
 lazy val fieldReference: PackratParser[UnresolvedReferenceExpression] = (STAR 
| ident | extensionIdent) ^^ { sym => unresolvedRef(sym) }{code}
 

It is simple but really make sense~

Looking forward for any opinion

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15574) DataType to LogicalType conversion issue

2020-01-13 Thread Benoit Hanotte (Jira)
Benoit Hanotte created FLINK-15574:
--

 Summary: DataType to LogicalType conversion issue
 Key: FLINK-15574
 URL: https://issues.apache.org/jira/browse/FLINK-15574
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Benoit Hanotte


We seem to be encountering an issue with the conversion from DataType to 
LogicalType with the Blink planner:
{code}
org.apache.flink.table.api.ValidationException: Type 
LEGACY(BasicArrayTypeInfo) of table field 'my_array' does not match 
with type BasicArrayTypeInfo of the field 'my_array' of the TableSource 
return type.
{code}

It seems there exists 2 paths to do the conversion from DataType to LogicalType:

1. TypeConversions.fromLegacyInfoToDataType():
used for instance when calling TableSchema.fromTypeInformation().

2.  LogicalTypeDataTypeConverter.fromDataTypeToLogicalType() 
Deprecated but still used in TableSourceUtil and many other places.

These 2 code paths can return a different LogicalType for the same input, 
leading to issues when the LogicalTypes are compared to ensure they are 
compatible.  For instance, PlannerTypeUtils.isAssignable() returns false for a 
DataType created from BasicArrayTypeInfo (leading to the ValidationException 
above).





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15575) Azure Filesystem Shades Wrong Package "httpcomponents"

2020-01-13 Thread Konstantin Knauf (Jira)
Konstantin Knauf created FLINK-15575:


 Summary: Azure Filesystem Shades Wrong Package "httpcomponents"
 Key: FLINK-15575
 URL: https://issues.apache.org/jira/browse/FLINK-15575
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.9.1, 1.8.3, 1.10.0
Reporter: Konstantin Knauf


Instead of shading "org.apache.httpcomponents" (this package does not exist) 
the azure filesystem should shade "org.apache.http". 

This e.g. causes problems when the azure filesystem and elasticsearch6 
connector are both on the classpath.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-13 Thread Andrey Zagrebin
Hi all,

While working on changing process memory to Flink memory in default
configuration, Xintong encountered a problem.
When -tm option is used to rewrite container memory, basically process
memory, it can collide with the default Flink memory.
For legacy users it should not be a problem as we adjusted the legacy heap
size option to be interpreted differently for standalone and container
modes.

One solution could be to say in -tm docs that we rewrite both options under
the hood: process and Flink memory, basically unset Flink memory from yaml
config.
The downside is that this adds more magic.

Alternatively, we can keep process memory in default config and, as
mentioned before, increase it to maintain the user experience by matching
the previous default setting for heap (now Flink in standalone) size.
The Flink memory can be mentioned in process memory comment as a simpler
alternative which does not require accounting for JVM overhead.
The downside is again more confusion while trying out Flink and tuning
memory at the same time.
On the other hand, if memory already needs to be tuned it should
quite quickly lead to the necessity of understanding the memory model in
Flink.

Best,
Andrey

On Thu, Jan 9, 2020 at 12:27 PM Stephan Ewen  wrote:

> Great! Thanks, guys, for the continued effort on this topic!
>
> On Thu, Jan 9, 2020 at 5:27 AM Xintong Song  wrote:
>
> > Thanks all for the discussion. I believe we have get consensus on all the
> > open questions discussed in this thread.
> >
> > Since Andrey already create a jira ticket for renaming shuffle memory
> > config keys with "taskmanager.memory.network.*", I'll create ticket for
> the
> > other topic that puts flink.size in flink-conf.yaml.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Jan 8, 2020 at 9:39 PM Andrey Zagrebin 
> > wrote:
> >
> > > It also looks to me that we should only swap network and memory in the
> > > option names: 'taskmanager.memory.network.*'.
> > > There is no strong consensus towards using new 'shuffle' naming so we
> can
> > > just rename it to  'taskmanager.memory.network.*' as 'shuffle' naming
> has
> > > never been released.
> > > When we have other shuffle services and start advertising more this
> > concept
> > > in Flink, we could revisit again the whole naming for this concept.
> > > https://jira.apache.org/jira/browse/FLINK-15517
> > >
> >
>


Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-13 Thread Stephan Ewen
I think we need an interpretation of "-tm" regardless of what is in the
default configuration, because we can always have a modified configuration
and then a user passes the "-tm" flag.

I kind of like the first proposal: Interpret "-tm" as "override memory size
config and set the Yarn TM container size." It would mean exactly ignoring
"taskmanager.memory.flink.size" and using the "-tm" value as "
"taskmanager.memory.process.size".
That does not sound too bad to me.

Best,
Stephan


On Mon, Jan 13, 2020 at 5:35 PM Andrey Zagrebin 
wrote:

> Hi all,
>
> While working on changing process memory to Flink memory in default
> configuration, Xintong encountered a problem.
> When -tm option is used to rewrite container memory, basically process
> memory, it can collide with the default Flink memory.
> For legacy users it should not be a problem as we adjusted the legacy heap
> size option to be interpreted differently for standalone and container
> modes.
>
> One solution could be to say in -tm docs that we rewrite both options under
> the hood: process and Flink memory, basically unset Flink memory from yaml
> config.
> The downside is that this adds more magic.
>
> Alternatively, we can keep process memory in default config and, as
> mentioned before, increase it to maintain the user experience by matching
> the previous default setting for heap (now Flink in standalone) size.
> The Flink memory can be mentioned in process memory comment as a simpler
> alternative which does not require accounting for JVM overhead.
> The downside is again more confusion while trying out Flink and tuning
> memory at the same time.
> On the other hand, if memory already needs to be tuned it should
> quite quickly lead to the necessity of understanding the memory model in
> Flink.
>
> Best,
> Andrey
>
> On Thu, Jan 9, 2020 at 12:27 PM Stephan Ewen  wrote:
>
> > Great! Thanks, guys, for the continued effort on this topic!
> >
> > On Thu, Jan 9, 2020 at 5:27 AM Xintong Song 
> wrote:
> >
> > > Thanks all for the discussion. I believe we have get consensus on all
> the
> > > open questions discussed in this thread.
> > >
> > > Since Andrey already create a jira ticket for renaming shuffle memory
> > > config keys with "taskmanager.memory.network.*", I'll create ticket for
> > the
> > > other topic that puts flink.size in flink-conf.yaml.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Wed, Jan 8, 2020 at 9:39 PM Andrey Zagrebin 
> > > wrote:
> > >
> > > > It also looks to me that we should only swap network and memory in
> the
> > > > option names: 'taskmanager.memory.network.*'.
> > > > There is no strong consensus towards using new 'shuffle' naming so we
> > can
> > > > just rename it to  'taskmanager.memory.network.*' as 'shuffle' naming
> > has
> > > > never been released.
> > > > When we have other shuffle services and start advertising more this
> > > concept
> > > > in Flink, we could revisit again the whole naming for this concept.
> > > > https://jira.apache.org/jira/browse/FLINK-15517
> > > >
> > >
> >
>


Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-13 Thread Stephan Ewen
Would be good to hear the thoughts of some more Yarn users, though.

On Mon, Jan 13, 2020 at 7:23 PM Stephan Ewen  wrote:

> I think we need an interpretation of "-tm" regardless of what is in the
> default configuration, because we can always have a modified configuration
> and then a user passes the "-tm" flag.
>
> I kind of like the first proposal: Interpret "-tm" as "override memory
> size config and set the Yarn TM container size." It would mean exactly
> ignoring "taskmanager.memory.flink.size" and using the "-tm" value as "
> "taskmanager.memory.process.size".
> That does not sound too bad to me.
>
> Best,
> Stephan
>
>
> On Mon, Jan 13, 2020 at 5:35 PM Andrey Zagrebin 
> wrote:
>
>> Hi all,
>>
>> While working on changing process memory to Flink memory in default
>> configuration, Xintong encountered a problem.
>> When -tm option is used to rewrite container memory, basically process
>> memory, it can collide with the default Flink memory.
>> For legacy users it should not be a problem as we adjusted the legacy heap
>> size option to be interpreted differently for standalone and container
>> modes.
>>
>> One solution could be to say in -tm docs that we rewrite both options
>> under
>> the hood: process and Flink memory, basically unset Flink memory from yaml
>> config.
>> The downside is that this adds more magic.
>>
>> Alternatively, we can keep process memory in default config and, as
>> mentioned before, increase it to maintain the user experience by matching
>> the previous default setting for heap (now Flink in standalone) size.
>> The Flink memory can be mentioned in process memory comment as a simpler
>> alternative which does not require accounting for JVM overhead.
>> The downside is again more confusion while trying out Flink and tuning
>> memory at the same time.
>> On the other hand, if memory already needs to be tuned it should
>> quite quickly lead to the necessity of understanding the memory model in
>> Flink.
>>
>> Best,
>> Andrey
>>
>> On Thu, Jan 9, 2020 at 12:27 PM Stephan Ewen  wrote:
>>
>> > Great! Thanks, guys, for the continued effort on this topic!
>> >
>> > On Thu, Jan 9, 2020 at 5:27 AM Xintong Song 
>> wrote:
>> >
>> > > Thanks all for the discussion. I believe we have get consensus on all
>> the
>> > > open questions discussed in this thread.
>> > >
>> > > Since Andrey already create a jira ticket for renaming shuffle memory
>> > > config keys with "taskmanager.memory.network.*", I'll create ticket
>> for
>> > the
>> > > other topic that puts flink.size in flink-conf.yaml.
>> > >
>> > > Thank you~
>> > >
>> > > Xintong Song
>> > >
>> > >
>> > >
>> > > On Wed, Jan 8, 2020 at 9:39 PM Andrey Zagrebin 
>> > > wrote:
>> > >
>> > > > It also looks to me that we should only swap network and memory in
>> the
>> > > > option names: 'taskmanager.memory.network.*'.
>> > > > There is no strong consensus towards using new 'shuffle' naming so
>> we
>> > can
>> > > > just rename it to  'taskmanager.memory.network.*' as 'shuffle'
>> naming
>> > has
>> > > > never been released.
>> > > > When we have other shuffle services and start advertising more this
>> > > concept
>> > > > in Flink, we could revisit again the whole naming for this concept.
>> > > > https://jira.apache.org/jira/browse/FLINK-15517
>> > > >
>> > >
>> >
>>
>


Understanding watermark

2020-01-13 Thread Cam Mach
Hello Flink expert,

We have a pipeline that read both bounded and unbounded sources and our
understanding is that when the bounded sources complete they should get a
watermark of +inf and then we should be able to take a savepoint and safely
restart the pipeline. However, we have source that never get watermarks and
we are confused as to what we are seeing and what we should expect


Cam Mach
Software Engineer
E-mail: cammac...@gmail.com
Tel: 206 972 2768


Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-13 Thread Stephan Ewen
Hi all!

Thanks a lot, Xintong, for this thorough analysis. Based on your analysis,
here are some thoughts:

+1 to change default JVM metaspace size from 128MB to 64MB
+1 to change default JVM overhead min size from 128MB to 196MB

Concerning the managed memory fraction, I am not sure I would change it,
for the following reasons:

  - We should assume RocksDB will be limited to managed memory by default.
This will either be active by default or we would encourage everyone to use
this by default, because otherwise it is super hard to reason about the
RocksDB footprint.
  - For standalone, a managed memory fraction of 0.3 is less than half of
the managed memory from 1.9.
  - I am not sure if the managed memory fraction is a value that all users
adjust immediately when scaling up the memory during their first try-out
phase. I would assume that most users initially only adjust
"memory.flink.size" or "memory.process.size". A value of 0.3 will lead to
having too large heaps and very little RocksDB / batch memory even when
scaling up during the initial exploration.
  - I agree, though, that 0.5 looks too aggressive, from your benchmarks.
So maybe keeping it at 0.4 could work?

And one question: Why do we set the Framework Heap by default? Is that so
we reduce the managed memory further is less than framework heap would be
left from the JVM heap?

Best,
Stephan

On Thu, Jan 9, 2020 at 10:54 AM Xintong Song  wrote:

> Hi all,
>
> As described in FLINK-15145 [1], we decided to tune the default
> configuration values of FLIP-49 with more jobs and cases.
>
> After spending time analyzing and tuning the configurations, I've come
> with several findings. To be brief, I would suggest the following changes,
> and for more details please take a look at my tuning report [2].
>
>- Change default managed memory fraction from 0.4 to 0.3.
>- Change default JVM metaspace size from 128MB to 64MB.
>- Change default JVM overhead min size from 128MB to 196MB.
>
> Looking forward to your feedback.
>
> Thank you~
>
> Xintong Song
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-15145
>
> [2]
> https://docs.google.com/document/d/1-LravhQYUIkXb7rh0XnBB78vSvhp3ecLSAgsiabfVkk/edit?usp=sharing
>
>


[jira] [Created] (FLINK-15576) remove isTemporary property from CatalogFunction API

2020-01-13 Thread Bowen Li (Jira)
Bowen Li created FLINK-15576:


 Summary: remove isTemporary property from CatalogFunction API
 Key: FLINK-15576
 URL: https://issues.apache.org/jira/browse/FLINK-15576
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.10.0


according to FLIP-79, CatalogFunction shouldn't have "isTemporary" property. 
Moving that from CatalogFunction to Create/AlterCatalogFunctionOperation



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15577) LoggicalWindowAggregate Rel nodes missing Window specs in digest

2020-01-13 Thread Benoit Hanotte (Jira)
Benoit Hanotte created FLINK-15577:
--

 Summary: LoggicalWindowAggregate Rel nodes missing Window specs in 
digest
 Key: FLINK-15577
 URL: https://issues.apache.org/jira/browse/FLINK-15577
 Project: Flink
  Issue Type: Bug
Reporter: Benoit Hanotte


The RelNode's digest (AbstractRelNode.getDisgest()), along its RowType, is used 
by the Calcite HepPlanner to avoid adding duplicate Vertices to the graph. If 
an equivalent vertex was already present in the graph, then that vertex is used 
in place of the new generated one: 
https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828

This means that *the digest needs to contain all the information necessary to 
identify a vertex and distinguish it from similar - but not equivalent - 
vertices*.

In the case of `LogicalWindowAggregation` and `FlinkLogicalWindowAggregation`, 
the window specs are currently not in the digest, meaning that two aggregations 
with the same signatures and expressions but different windows are considered 
equivalent by the planner, which is not correct and will lead to an invalid 
Physical Plan.

For instance, the following query would give an invalid plan:

{code}
WITH window_1h AS (
SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as 
`timestamp`
FROM my_table
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
),
window_2h AS (
SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
`timestamp`
FROM my_table
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
)
(SELECT * FROM window_1h)
UNION ALL
(SELECT * FROM window_2h)
{code}

The invalid plan generated by the planner is the following (*Please note the 
windows in the DataStreamGroupWindowAggregate being the same when they should 
be different*):

{code}
DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, 
cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176
  DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative 
cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173
DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 
720.millis, 360.millis)], select=[start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 
100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172
  DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative 
cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
  DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative 
cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175
DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 
720.millis, 360.millis)], select=[start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 
100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174
  DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative 
cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15578) Implement exactly-once JDBC sink

2020-01-13 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-15578:
-

 Summary: Implement exactly-once JDBC sink
 Key: FLINK-15578
 URL: https://issues.apache.org/jira/browse/FLINK-15578
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / JDBC
Reporter: Roman Khachatryan
 Fix For: 1.11.0


As per discussion in the dev mailing list, there are two options:
 # Write-ahead log
 # Two-phase commit (XA)

the latter being preferable.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-13 Thread Kurt Young
HI Xingtong,

IIRC during our tpc-ds 10T benchmark, we have suffered by JM's metaspace
size and full gc which
caused by lots of classloadings of source input split. Could you check
whether changing the default
value from 128MB to 64MB will make it worse?

Correct me if I misunderstood anything, also cc @Jingsong

Best,
Kurt


On Tue, Jan 14, 2020 at 3:44 AM Stephan Ewen  wrote:

> Hi all!
>
> Thanks a lot, Xintong, for this thorough analysis. Based on your analysis,
> here are some thoughts:
>
> +1 to change default JVM metaspace size from 128MB to 64MB
> +1 to change default JVM overhead min size from 128MB to 196MB
>
> Concerning the managed memory fraction, I am not sure I would change it,
> for the following reasons:
>
>   - We should assume RocksDB will be limited to managed memory by default.
> This will either be active by default or we would encourage everyone to use
> this by default, because otherwise it is super hard to reason about the
> RocksDB footprint.
>   - For standalone, a managed memory fraction of 0.3 is less than half of
> the managed memory from 1.9.
>   - I am not sure if the managed memory fraction is a value that all users
> adjust immediately when scaling up the memory during their first try-out
> phase. I would assume that most users initially only adjust
> "memory.flink.size" or "memory.process.size". A value of 0.3 will lead to
> having too large heaps and very little RocksDB / batch memory even when
> scaling up during the initial exploration.
>   - I agree, though, that 0.5 looks too aggressive, from your benchmarks.
> So maybe keeping it at 0.4 could work?
>
> And one question: Why do we set the Framework Heap by default? Is that so
> we reduce the managed memory further is less than framework heap would be
> left from the JVM heap?
>
> Best,
> Stephan
>
> On Thu, Jan 9, 2020 at 10:54 AM Xintong Song 
> wrote:
>
> > Hi all,
> >
> > As described in FLINK-15145 [1], we decided to tune the default
> > configuration values of FLIP-49 with more jobs and cases.
> >
> > After spending time analyzing and tuning the configurations, I've come
> > with several findings. To be brief, I would suggest the following
> changes,
> > and for more details please take a look at my tuning report [2].
> >
> >- Change default managed memory fraction from 0.4 to 0.3.
> >- Change default JVM metaspace size from 128MB to 64MB.
> >- Change default JVM overhead min size from 128MB to 196MB.
> >
> > Looking forward to your feedback.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-15145
> >
> > [2]
> >
> https://docs.google.com/document/d/1-LravhQYUIkXb7rh0XnBB78vSvhp3ecLSAgsiabfVkk/edit?usp=sharing
> >
> >
>


Re: Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink

2020-01-13 Thread Zhenghua Gao
+1 (non-binding). Thanks for driving this. It's an important improvement in
the discussed scenarios.

*Best Regards,*
*Zhenghua Gao*


On Mon, Jan 13, 2020 at 6:13 PM Haibo Sun  wrote:

> +1 (non-binding)
>
>
> Best,
> Haibo
>
> At 2020-01-13 11:36:12, "Yun Gao"  wrote:
> >+1 (non-binding).
> >
> >Very thanks for introducing this topic back, and it should be able to
> bring improvements in the discussed scenarios.
> >
> >Best,
> >Yun
> >
> >
> >--
> >From:Arvid Heise 
> >Send Time:2020 Jan. 10 (Fri.) 16:48
> >To:dev ; Zhijiang 
> >Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink
> >
> >non-binding +1
> >
> >On Fri, Jan 10, 2020 at 9:11 AM Zhijiang  .invalid>
> >wrote:
> >
> >> +1, it is really nice to have the N-Ary stream operator which is
> >> meaningful in some scenarios.
> >>
> >> best,
> >> Zhijiang
> >>
> >>
> >> --
> >> From:Jingsong Li 
> >> Send Time:2020 Jan. 10 (Fri.) 11:00
> >> To:dev 
> >> Subject:Re: [VOTE] FLIP-92: Add N-Ary Stream Operator in Flink
> >>
> >> +1 non-binding to the N-Ary Stream Operator. Thanks Piotr for driving.
> >> Looks like the previous FLIP-92 did not change the "Next FLIP Number" in
> >> FLIP page.
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Fri, Jan 10, 2020 at 8:40 AM Benchao Li  wrote:
> >>
> >> > Hi Piotr,
> >> >
> >> > It seems that we have the 'FLIP-92' already.
> >> > see:
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+JDBC+catalog+and+Postgres+catalog
> >> >
> >> >
> >> > Piotr Nowojski  于2020年1月9日周四 下午11:25写道:
> >> >
> >> > > Hi,
> >> > >
> >> > > I would like to start a vote for adding the N-Ary Stream Operator in
> >> > Flink
> >> > > as discussed in the discussion thread [1].
> >> > >
> >> > > This vote will be opened at least until Wednesday, January 15th 8:00
> >> UTC.
> >> > >
> >> > > Piotrek
> >> > >
> >> > > [1]
> >> > >
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
> >> > > <
> >> > >
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Add-N-Ary-Stream-Operator-td11341.html
> >> > > >
> >> >
> >> >
> >> >
> >> > --
> >> >
> >> > Benchao Li
> >> > School of Electronics Engineering and Computer Science, Peking
> University
> >> > Tel:+86-15650713730
> >> > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> >> >
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >>
>


Re: [Discuss] Tuning FLIP-49 configuration default values.

2020-01-13 Thread Xintong Song
Thanks for the feedback, Stephan and Kurt.

@Stephan

Regarding managed memory fraction,
- It makes sense to keep the default value 0.4, if we assume rocksdb memory
is limited by default.
- AFAIK, currently rocksdb by default does not limit its memory usage. And
I'm positive to change it.
- Personally, I don't like the idea that we the out-of-box experience (for
which we set the default fraction) relies on that users will manually turn
another switch on.

Regarding framework heap memory,
- The major reason we set it by default is, as you mentioned, that to have
a safe net of minimal JVM heap size.
- Also, considering the in progress FLIP-56 (dynamic slot allocation), we
want to reserve some heap memory that will not go into the slot profiles.
That's why we decide the default value according to the heap memory usage
of an empty task executor.

@Kurt
Regarding metaspace,
- This config option ("taskmanager.memory.jvm-metaspace") only takes effect
on TMs. Currently we do not set metaspace size for JM.
- If we have the same metaspace problem on TMs, then yes, changing it from
128M to 64M will make it worse. However, IMO 10T tpc-ds benchmark should
not be considered as out-of-box experience and it makes sense to tune the
configurations for it. I think the smaller metaspace size would be a better
choice for the first trying-out, where a job should not be too complicated,
the TM size could be relative small (e.g. 1g).

Thank you~

Xintong Song



On Tue, Jan 14, 2020 at 9:38 AM Kurt Young  wrote:

> HI Xingtong,
>
> IIRC during our tpc-ds 10T benchmark, we have suffered by JM's metaspace
> size and full gc which
> caused by lots of classloadings of source input split. Could you check
> whether changing the default
> value from 128MB to 64MB will make it worse?
>
> Correct me if I misunderstood anything, also cc @Jingsong
>
> Best,
> Kurt
>
>
> On Tue, Jan 14, 2020 at 3:44 AM Stephan Ewen  wrote:
>
>> Hi all!
>>
>> Thanks a lot, Xintong, for this thorough analysis. Based on your analysis,
>> here are some thoughts:
>>
>> +1 to change default JVM metaspace size from 128MB to 64MB
>> +1 to change default JVM overhead min size from 128MB to 196MB
>>
>> Concerning the managed memory fraction, I am not sure I would change it,
>> for the following reasons:
>>
>>   - We should assume RocksDB will be limited to managed memory by default.
>> This will either be active by default or we would encourage everyone to
>> use
>> this by default, because otherwise it is super hard to reason about the
>> RocksDB footprint.
>>   - For standalone, a managed memory fraction of 0.3 is less than half of
>> the managed memory from 1.9.
>>   - I am not sure if the managed memory fraction is a value that all users
>> adjust immediately when scaling up the memory during their first try-out
>> phase. I would assume that most users initially only adjust
>> "memory.flink.size" or "memory.process.size". A value of 0.3 will lead to
>> having too large heaps and very little RocksDB / batch memory even when
>> scaling up during the initial exploration.
>>   - I agree, though, that 0.5 looks too aggressive, from your benchmarks.
>> So maybe keeping it at 0.4 could work?
>>
>> And one question: Why do we set the Framework Heap by default? Is that so
>> we reduce the managed memory further is less than framework heap would be
>> left from the JVM heap?
>>
>> Best,
>> Stephan
>>
>> On Thu, Jan 9, 2020 at 10:54 AM Xintong Song 
>> wrote:
>>
>> > Hi all,
>> >
>> > As described in FLINK-15145 [1], we decided to tune the default
>> > configuration values of FLIP-49 with more jobs and cases.
>> >
>> > After spending time analyzing and tuning the configurations, I've come
>> > with several findings. To be brief, I would suggest the following
>> changes,
>> > and for more details please take a look at my tuning report [2].
>> >
>> >- Change default managed memory fraction from 0.4 to 0.3.
>> >- Change default JVM metaspace size from 128MB to 64MB.
>> >- Change default JVM overhead min size from 128MB to 196MB.
>> >
>> > Looking forward to your feedback.
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-15145
>> >
>> > [2]
>> >
>> https://docs.google.com/document/d/1-LravhQYUIkXb7rh0XnBB78vSvhp3ecLSAgsiabfVkk/edit?usp=sharing
>> >
>> >
>>
>


Re: [DISCUSS] Some feedback after trying out the new FLIP-49 memory configurations

2020-01-13 Thread Xintong Song
True, even we have "process.size" rather than "flink.size" in the default
config file, user can still have "flink.size" in their custom config file.
If we consider "-tm" as a shortcut for users to override the TM memory
size, then it makes less sense to require users to remove "flink.size" from
their config file whenever then want to use "-tm".

I'm convinced that ignoring "flink.size" might not be a bad idea.
And I think we should also update the description of "-tm" (in
"FlinkYarnSessionCli") to explicitly mention that it would overwrite
"flink.size" and "process.size".

Thank you~

Xintong Song



On Tue, Jan 14, 2020 at 2:24 AM Stephan Ewen  wrote:

> Would be good to hear the thoughts of some more Yarn users, though.
>
> On Mon, Jan 13, 2020 at 7:23 PM Stephan Ewen  wrote:
>
> > I think we need an interpretation of "-tm" regardless of what is in the
> > default configuration, because we can always have a modified
> configuration
> > and then a user passes the "-tm" flag.
> >
> > I kind of like the first proposal: Interpret "-tm" as "override memory
> > size config and set the Yarn TM container size." It would mean exactly
> > ignoring "taskmanager.memory.flink.size" and using the "-tm" value as "
> > "taskmanager.memory.process.size".
> > That does not sound too bad to me.
> >
> > Best,
> > Stephan
> >
> >
> > On Mon, Jan 13, 2020 at 5:35 PM Andrey Zagrebin 
> > wrote:
> >
> >> Hi all,
> >>
> >> While working on changing process memory to Flink memory in default
> >> configuration, Xintong encountered a problem.
> >> When -tm option is used to rewrite container memory, basically process
> >> memory, it can collide with the default Flink memory.
> >> For legacy users it should not be a problem as we adjusted the legacy
> heap
> >> size option to be interpreted differently for standalone and container
> >> modes.
> >>
> >> One solution could be to say in -tm docs that we rewrite both options
> >> under
> >> the hood: process and Flink memory, basically unset Flink memory from
> yaml
> >> config.
> >> The downside is that this adds more magic.
> >>
> >> Alternatively, we can keep process memory in default config and, as
> >> mentioned before, increase it to maintain the user experience by
> matching
> >> the previous default setting for heap (now Flink in standalone) size.
> >> The Flink memory can be mentioned in process memory comment as a simpler
> >> alternative which does not require accounting for JVM overhead.
> >> The downside is again more confusion while trying out Flink and tuning
> >> memory at the same time.
> >> On the other hand, if memory already needs to be tuned it should
> >> quite quickly lead to the necessity of understanding the memory model in
> >> Flink.
> >>
> >> Best,
> >> Andrey
> >>
> >> On Thu, Jan 9, 2020 at 12:27 PM Stephan Ewen  wrote:
> >>
> >> > Great! Thanks, guys, for the continued effort on this topic!
> >> >
> >> > On Thu, Jan 9, 2020 at 5:27 AM Xintong Song 
> >> wrote:
> >> >
> >> > > Thanks all for the discussion. I believe we have get consensus on
> all
> >> the
> >> > > open questions discussed in this thread.
> >> > >
> >> > > Since Andrey already create a jira ticket for renaming shuffle
> memory
> >> > > config keys with "taskmanager.memory.network.*", I'll create ticket
> >> for
> >> > the
> >> > > other topic that puts flink.size in flink-conf.yaml.
> >> > >
> >> > > Thank you~
> >> > >
> >> > > Xintong Song
> >> > >
> >> > >
> >> > >
> >> > > On Wed, Jan 8, 2020 at 9:39 PM Andrey Zagrebin <
> azagre...@apache.org>
> >> > > wrote:
> >> > >
> >> > > > It also looks to me that we should only swap network and memory in
> >> the
> >> > > > option names: 'taskmanager.memory.network.*'.
> >> > > > There is no strong consensus towards using new 'shuffle' naming so
> >> we
> >> > can
> >> > > > just rename it to  'taskmanager.memory.network.*' as 'shuffle'
> >> naming
> >> > has
> >> > > > never been released.
> >> > > > When we have other shuffle services and start advertising more
> this
> >> > > concept
> >> > > > in Flink, we could revisit again the whole naming for this
> concept.
> >> > > > https://jira.apache.org/jira/browse/FLINK-15517
> >> > > >
> >> > >
> >> >
> >>
> >
>


Re: [DISCUSS] Add N-Ary Stream Operator

2020-01-13 Thread Yun Tang
Hi

I noticed that previous design doc [1] also talked about the topic of 
introducing new KeyedStreamOperatorNG, I wonder is that a must-do to introduce 
N-ary stream operator?


[1] 
https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI

Best
Yun Tang

From: Piotr Nowojski 
Sent: Thursday, January 9, 2020 23:27
To: dev 
Subject: Re: [DISCUSS] Add N-Ary Stream Operator

Hi,

I have started a vote on this topic [1], please cast your +1 or -1 there :)

Also I assigned FLIP-92 number to this design doc.

Piotrek

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html
 


> On 10 Dec 2019, at 07:10, Jingsong Li  wrote:
>
> Hi Piotr,
>
> Sorry for the misunderstanding, chaining does work with multiple output
> right now, I mean, it's also a very important feature, and it should work
> with N-ary selectable input operators.
> We all think that providing N-ary selectable input operator is a very
> important thing, it makes TwoInputOperator chaining possible in upper
> layer, and it makes things simpler.
>
> Looking forward to it very much.
>
> Best,
> Jingsong Lee
>
> On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski  wrote:
>
>> Hi,
>>
>> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work
>> with multiple output right now (doesn’t it?), that’s also a good future
>> story.
>>
>> Re Kurt:
>> I think this pattern could be easily handled if those two joins are
>> implemented as a single 3 input operator, that internally is composed of
>> those three operators.
>> 1. You can set the initial InputSelection to Build1 and Build2.
>> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1
>> and Build2.
>> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput`
>> to the internal `HashAgg` operator
>> 4. When Build2 finally receives `endOfInput`, you can finally forward the
>> `endOfInput` to the internal `HashAgg`
>>
>> Exactly for reasons like that, I wanted to at least post pone handling
>> tree-like operator chains in the Flink. Logic like that is difficult to
>> express generically, since it requires the knowledge about the operators
>> behaviour. While when hardcoded for the specific project (Blink in this
>> case) and encapsulated behind N-ary selectable input operator, it’s very
>> easy to handle by the runtime. Sure, at the expense of a bit more
>> complexity in forcing the user to compose operators, that’s why I’m not
>> saying that we do not want to handle this at some point in the future, but
>> at least not in the first version.
>>
>> Piotrek
>>
>>> On 5 Dec 2019, at 10:11, Jingsong Li  wrote:
>>>
>>> Kurt mentioned a very interesting thing,
>>>
>>> If we want to better performance to read simultaneously, To this pattern:
>>> We need to control not only the read order of inputs, but also the
>> outputs
>>> of endInput.
>>> In this case, HashAggregate can only call its real endInput after the
>> input
>>> of build2 is finished, so the endInput of an operator is not necessarily
>>> determined by its input, but also by other associated inputs.
>>> I think we have the ability to do this in the n-input operator.
>>>
>>> Note that these behaviors should be determined at compile time.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young  wrote:
>>>
 During implementing n-ary input operator in table, please keep
 this pattern in mind:

 Build1 ---+

 |

 +---> HshJoin1 --—> HashAgg ---+

 |  |

 Probe1 ---+  +---> HashJoin2

|

  Build2 ---+

 It's quite interesting that both `Build1`, `Build2` and `Probe1` can
 be read simultaneously. But we need to control `HashAgg`'s output
 before `Build2` finished. I don't have a clear solution for now, but
 it's a common pattern we will face.

 Best,
 Kurt


 On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li 
>> wrote:

> Hi Piotr,
>
>> a) two input operator X -> one input operator Y -> one input operator
>> Z
> (ALLOWED)
>> b) n input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
>> c) two input operator X -> one input operator Y -> two input operator
>> Z
> (NOT ALLOWED as a single chain)
>
> NOT ALLOWED to c) sounds good to me. I understand that it is very
 difficult
> to propose a general support for any input selectable two input
>> operators
> chain with high performance.
> And it is not necessary for table layer too. b) has already excited us.
>
> Actually, we have supported n output chain too:
> d)

Re: Storing offsets in Kafka

2020-01-13 Thread Marvin777
Hi jiangjie,

Yeah I am using the second case.  (Flink 1.7.1, Kafka
0.10.2, FlinkKafkaConsumer010)
But now there is a problem, the data is consumed normally, but the commit
offset is not continued. The following exception is found:

[image: image.png]



Becket Qin  于2019年9月5日周四 上午11:32写道:

> Hi Dominik,
>
> There has not been any change to the offset committing logic in
> KafkaConsumer for a while. But the logic is a little complicated. The
> offset commit to Kafka is only enabled in the following two cases:
>
> 1. Flink checkpoint is enabled AND commitOffsetsOnCheckpoint is true
> (default value is true)
> 2. Flink checkpoint is disabled AND the vanilla KafkaConsumer has a)
> enable.auto.commit=true (default value is true); b)
> auto.commit.interval.ms>0
> (default value is 5000).
>
> Note that in case 1, if the job exits before the first checkpoint takes
> place, then there will be no offset committed.
>
> Can you check if your setting falls in one of the two cases?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Wed, Sep 4, 2019 at 9:03 PM Dominik Wosiński  wrote:
>
> > Hey,
> > I was wondering whether something has changed for KafkaConsumer, since I
> am
> > using Kafka 2.0.0 with Flink and I wanted to use group offsets but there
> > seems to be no change in the topic where Kafka stores it's offsets, after
> > restart Flink uses the `auto.offset.reset` so it seems that there is no
> > offsets commit happening. The checkpoints are properly configured and I
> am
> > able to restore with Savepoint. But the group offsets are not working
> > properly. It there anything that has changed in this manner ?
> >
> > Best Regards,
> > Dom.
> >
>


[jira] [Created] (FLINK-15579) Can not use jdbc connector on Blink batch mode

2020-01-13 Thread Shu Li Zheng (Jira)
Shu Li Zheng created FLINK-15579:


 Summary: Can not use jdbc connector on Blink batch mode 
 Key: FLINK-15579
 URL: https://issues.apache.org/jira/browse/FLINK-15579
 Project: Flink
  Issue Type: Bug
Reporter: Shu Li Zheng


Because JDBCTableSourceSinkFactory.createStreamTableSink() create 
JDBCUpsertTableSink. But BatchExecSink can not work with UpsertStreamTableSink.

{code:scala}
  override protected def translateToPlanInternal(
  planner: BatchPlanner): Transformation[Any] = {
val resultTransformation = sink match {
  case _: RetractStreamTableSink[T] | _: UpsertStreamTableSink[T] =>
throw new TableException("RetractStreamTableSink and 
UpsertStreamTableSink is not" +
  " supported in Batch environment.")
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15580) Add baseurl to docs/build_docs.sh

2020-01-13 Thread Benchao Li (Jira)
Benchao Li created FLINK-15580:
--

 Summary: Add baseurl to docs/build_docs.sh
 Key: FLINK-15580
 URL: https://issues.apache.org/jira/browse/FLINK-15580
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.9.1, 1.10.0
Reporter: Benchao Li


As discussed in https://issues.apache.org/jira/browse/FLINK-15559

Currently, we did not set {{baseurl}}, which made broken links due to missing 
{{baseurl}} not easy to detect.

So I propose we add a {{baseurl}} to {{build_docs.sh}}'s -i & -p mode, for 
example  {{/projects/flink/local-baseurl}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15581) SpillingResettableMutableObjectIterator data overflow

2020-01-13 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-15581:
--

 Summary: SpillingResettableMutableObjectIterator data overflow
 Key: FLINK-15581
 URL: https://issues.apache.org/jira/browse/FLINK-15581
 Project: Flink
  Issue Type: Bug
  Components: API / DataSet
Affects Versions: 1.9.1, 1.8.3, 1.7.2, 1.6.4, 1.10.0
Reporter: Piotr Nowojski


As [reported by a user on the mailing 
list|https://lists.apache.org/thread.html/r1e3c53eaddfd8050c94ee4e521da4fc96a119662937cf801801bde52%40%3Cuser.flink.apache.org%3E]
{quote}
SpillingResettableMutableObjectIterator has a data overflow problem if the 
number of elements in a single input exceeds Integer.MAX_VALUE.

The reason is inside the SpillingResettableMutableObjectIterator, it track the 
total number of elements and the number of elements currently read with two int 
type fileds (elementCount and currentElementNum), and if the number of elements 
exceeds Integer.MAX_VALUE, it will overflow.

If there is an overflow, then in the next iteration, after reset the input , 
the data will not be read or only part of the data will be read.
{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Data overflow in SpillingResettableMutableObjectIterator

2020-01-13 Thread Piotr Nowojski
Hi Jian,

Thank your for reporting the issue. I see that you have already created a 
ticket for this [1].

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-15549 



> On 9 Jan 2020, at 09:10, Jian Cao  wrote:
> 
> Hi all:
> We are using flink's iteration,and find the 
> SpillingResettableMutableObjectIterator has a data overflow problem if the 
> number of elements in a single input exceeds Integer.MAX_VALUE.
> 
> The reason is inside the SpillingResettableMutableObjectIterator, it track 
> the total number of elements and the number of elements currently read with 
> two int type fileds (elementCount and currentElementNum), and if the number 
> of elements exceeds Integer.MAX_VALUE, it will overflow.
> 
> If there is an overflow, then in the next iteration, after reset the input , 
> the data will not be read or only part of the data will be read.
> 
> Therefore, I suggest changing the type of these two fields of 
> SpillingResettableMutableObjectIterator from int to long.
> 
> Best regards.