Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread Yun Gao
Hi Sagar,

  I rechecked and found that the new kafka source is not formally publish yet, 
and a stable method I think may be try adding the FlinkKafkaConsumer as a 
BOUNDED source first. Sorry for the inconvient. 

Best,
 Yun

--
Sender:Yun Gao
Date:2021/01/14 15:26:54
Recipient:Ardhani Narasimha; 
sagar
Cc:Flink User Mail List
Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Hi Sagar,

  I think the problem is that the legacy source implemented by extending 
SourceFunction are all defined as CONTINOUS_UNBOUNDED when use env.addSource(). 
Although there is hacky way to add the legacy sources as BOUNDED source [1], I 
think you may first have a try of new version of KafkaSource [2] ? The new 
version of KafkaSource is implemented with the new Source API [3], which 
provides unfied support for the streaming and batch mode.

Best,
 Yun




[1] 
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
[2]  
https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface




 --Original Mail --
Sender:Ardhani Narasimha 
Send Date:Thu Jan 14 15:11:35 2021
Recipients:sagar 
CC:Flink User Mail List 
Subject:Re: Using Kafka as bounded source with DataStream API in batch mode 
(Flink 1.12)

Interesting use case. 

Can you please elaborate more on this.
On what criteria do you want to batch? Time? Count? Or Size?

On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:

Hi Team,

I am getting the following error while running DataStream API in with batch 
mode with kafka source.
I am using FlinkKafkaConsumer to consume the data.

Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source with 
the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, 
please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-core-1.12.0.jar:1.12.0]

In my batch program I wanted to work with four to five different stream in 
batch mode as data source is bounded

I don't find any clear example of how to do it with kafka souce with Flink 1.12

I don't want to use JDBC source as underlying database table may change. please 
give me some example on how to achieve the above use case.

Also for any large bounded source are there any alternatives to achieve this?



-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended 
receipiant please ignore this email.
---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---


Re: Getting Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler

2021-01-14 Thread Arvid Heise
Hi Avi,

apparently the maximum version that Flink supports for scala is 2.12.7 [1].
Do you have a specific reason to use a higher version?

[1] https://issues.apache.org/jira/browse/FLINK-12461

On Thu, Jan 14, 2021 at 5:11 AM Avi Levi  wrote:

> Hi Arvid,
> Please find attached full build.gradle file
>
> On Tue, Jan 12, 2021 at 8:18 PM Arvid Heise  wrote:
>
>> Can you post the full dependencies of sbt/maven/gradle whatever?
>>
>> On Tue, Jan 12, 2021 at 3:54 AM Avi Levi  wrote:
>>
>>> Hi Arvid,
>>> using :
>>>
>>> flinkVersion = '1.12.0'
>>> scalaBinaryVersion = '2.12'
>>>
>>> I simplified the example to (same exception)  :
>>>
>>> object Flinktest extends App {
>>>   private val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>   env.fromElements("A", "B","c")
>>> .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
>>> .process{new ProcessAllWindowFunction[String, List[String], TimeWindow] 
>>> {
>>>   override def process(context: Context, elements: Iterable[String], 
>>> out: Collector[List[String]]): Unit = {
>>> out.collect(elements.toList)
>>>   }
>>> }
>>> }
>>> .print()
>>>
>>> env.execute("Sample")
>>> }
>>>
>>>
>>>
>>>
>>> On Tue, Jan 5, 2021 at 1:53 PM Arvid Heise  wrote:
>>>
 Hi Avi,

 without being a scala-guy, I'm guessing that you are mixing scala
 versions. Could you check that your user code uses the same scala version
 as Flink (1.11 or 1.12)? I have also heard of issues with different minor
 versions of scala, so make sure to use the exact same version (e.g.
 2.11.12).

 On Mon, Dec 28, 2020 at 3:54 PM Avi Levi  wrote:

> I am trying to aggregate all records in a time window. This is my
> ProcessAllWindowFunction :
>
> case class SimpleAggregate(elms: List[String])
>
> class AggregateLogs extends ProcessAllWindowFunction[String, 
> SimpleAggregate, TimeWindow ] {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[SimpleAggregate]): Unit = {
> val es: List[String] = elements.toList
> val record = SimpleAggregate(es)
> out.collect(record)
>   }
> }
>
> But I am getting this exception why ?
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> scala.tools.reflect.ToolBoxError: reflective compilation has failed: 
> cannot
> initialize the compiler due to java.lang.BootstrapMethodError:
> java.lang.NoSuchMethodError:
> scala.collection.immutable.List.$anonfun$flatMap$1$adapted(Lscala/runtime/BooleanRef;Lscala/runtime/ObjectRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)Ljava/lang/Object;
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2$$anon$3.(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1.$anonfun$createSerializer$1(HandleFinancialJob.scala:52)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1.createSerializer(HandleFinancialJob.scala:52)
> at
> o

Deterministic rescale for test

2021-01-14 Thread Martin Frank Hansen
Hi,

I am trying to make a test-suite for our Flink jobs, and are having
problems making the input-data deterministic.

We are reading a file-input with parallelism 1 and want to rescale to a
higher parallelism, such that the ordering of the data is the same every
time.

I have tried using rebalance, rescale but it seems to randomly distribute
data between partitions. We don't need something optimized, we just need
the same distribution for every run.
Is this possible?

Some code:

val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)

rawStream.rescale
...

best regards

-- 

Martin Frank Hansen


Re: StreamingFileSink with ParquetAvroWriters

2021-01-14 Thread Dawid Wysakowicz
Hi Jan

Could you make sure you are packaging that dependency with your job jar?
There are instructions how to configure your build setup[1]. Especially
the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:
>
> Hi,
>
>  
>
> i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink
> for writing into HDFS in Parquet format.
>
>  
>
> As it says in the documentation I have added the dependencies:
>
>  
>
> 
>    org.apache.flink
>    flink-parquet_${scala.binary.version}
>    ${flink.version}
> 
>
>  
>
> And this is my file sink definition:
>
>  
>
> val sink: StreamingFileSink[Event] = StreamingFileSink  
> ./forBulkFormat/(     new
> Path("hdfs://namenode.local:8020/user/datastream/"),    
> ParquetAvroWriters./forReflectRecord/(/classOf/[Event])   )   .build()
>
>  
>
>  
>
> If I execute this in cluster I get the following error:
>
>  
>
> java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter
>
> at
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)
>
> at
> org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)
>
> at
> org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
>
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)
>
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
>  
>
>  
>
> Looks like there are some dependencies missing. How can I fix this?
>
>  
>
>  
>
> Jan O.
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den
> Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren
> oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht
> irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail
> oder unter der oben angegebenen Telefonnummer. 


signature.asc
Description: OpenPGP digital signature


Re: Elasticsearch config maxes can not be disabled

2021-01-14 Thread Dawid Wysakowicz
Hi,

First of all, what Flink versions are you using?

You are right it is a mistake in the documentation of the
sink.bulk-flush.max-actions. It should say: Can be set to |'-1'| to
disable it. I created a ticket[1] to track that. And as far as I can
tell and I quickly checked that it should work. As for the
sink.bulk-flush.max-size you should be able to disable it with a value
of '0'.

Could you share with us how do you use the connector? Could you also
share the full stack trace for the exception you're getting? Are you
creating the table with a CREATE statement?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-20979

On 13/01/2021 20:10, Rex Fenley wrote:
> Hello,
>
> It doesn't seem like we can disable max actions and max size for
> Elasticsearch connector.
>
> Docs:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
>
>
>   sink.bulk-flush.max-actions
>
>   optional1000Integer Maximum number of buffered 
> actions per bulk
> request. Can be set to |'0'| to disable it.
>
>
>   sink.bulk-flush.max-size
>
>   optional2mb MemorySize  Maximum size in memory of 
> buffered actions
> per bulk request. Must be in MB granularity. Can be set to |'0'| to
> disable it.
>
> Reality:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Max number of buffered actions must be larger
> than 0.
>
> ES code looks like -1 is actually the value for disabling, but when I
> use -1:
> Caused by: java.lang.IllegalArgumentException: Could not parse value
> '-1' for key 'sink.bulk-flush.max-size'.
>
> How can I disable these two settings?
>
> Thanks!
>
> -- 
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG
>  |  FOLLOW US
>  |  LIKE US
> 
>


signature.asc
Description: OpenPGP digital signature


Fwd: Error querying flink state

2021-01-14 Thread Falak Kansal
Hi,

I have set up a flink cluster on my local machine. I created a flink job (
TrackMaximumTemperature) and made the state queryable. I am using
*github/streamingwithflink/chapter7/QueryableState.scala* example from
*https://github.com/streaming-with-flink
* repository. Please find the
file attached.

Now i have the running job id and when i go and try to access the state, it
throws an exception. I see the job is running and I am using the correct
jobId. Also checkpointing is enabled in the original job and i have set the
properties related to checkpointing in flink-conf.yaml. Am I
missing something? Any leads will be appreciated. Thank you :)


*Exception stack trace:*
Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=maxTemperature of
job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
package io.github.streamingwithflink.chapter7

import java.util.concurrent.CompletableFuture

import io.github.streamingwithflink.util.{SensorReading, SensorSource, 
SensorTimeAssigner}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.queryablestate.client.QueryableStateClient
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time

object TrackMaximumTemperature {

  /** main() defines and executes the DataStream program */
  def main(args: Array[String]) {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()

// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)

// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
  // SensorSource generates random temperature readings
  .addSource(new SensorSource)
  // assign timestamps and watermarks which are required for event time
  .assignTimestampsAndWatermarks(new SensorTimeAssigner)

val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData
  // project to sensor id and temperature
  .map(r => (r.id, r.temperature))
  // compute every 10 seconds the max temperature per sensor
  .keyBy(_._1)
  .timeWindow(Time.seconds(10))
  .max(1)

// store latest value for each sensor in a queryable state
tenSecsMaxTemps
  .keyBy(_._1)
  .asQueryableState("maxTemperature")

// execute application
env.execute("Track max temperature")
  }
}

object TemperatureDashboard {

  // queryable state proxy connection information.
  // can be looked up in logs of running QueryableStateJob
  val proxyHost = "localhost"
  val proxyPort = 9069
  // jobId of running QueryableStateJob.
  // can be looked up in logs of running job or the web UI
  val jobId = "9a528bf3e1b650aed7e0b1e26d038ad5"

  // how many sensors to query
  val numSensors = 5
  // how often to query
  val refreshInterval = 1

  def main(args: Array[String]): Unit = {

// configure client with host and port of queryable state proxy
val client = new QueryableStateClient(proxyHost, proxyPort)

val futures = new Array[CompletableFuture[ValueState[(String, 
Double)]]](numSensors)
val results = new Array[Double](numSensors)

// print header line of dashboard table
val header = (for (i <- 0 until numSensors) yield "sensor_" + (i + 
1)).mkString("\t| ")
println(header)

// loop forever
while (true) {

  // send out async queries
  for (i <- 0 until numSensors) {
futures(i) = queryState("sensor_" + (i + 1), client)
  }
  // wait for results
  for (i <- 0 until numSensors) {
resul

Re: Flink[Python] questions

2021-01-14 Thread Shuiqiang Chen
Hi Dc,

Thank you for your feedback.

1. Currently, only built-in types are supported in Python DataStream API,
however, you can apply a Row type to represent a  custom Python class as a
workaround that field names stand for the name of member variables and
field types stand for the type of member variables.

2. Could you please provide the full executed command line and which kind
of cluster you are running (standalone/yarn/k8s)? Various command lines to
submit a Pylink job are shown in
https://ci.apache.org/projects/flink/flink-docs-master/deployment/cli.html#submitting-pyflink-jobs
.

The attachment is an example code for a Python DataStream API job, for your
information.

Best,
Shuiqiang

Dc Zhao (BLOOMBERG/ 120 PARK)  于2021年1月14日周四
下午1:00写道:

> Hi Flink Community:
> We are using the pyflink to develop a POC for our project. We encountered
> some questions while using the flink.
>
> We are using the flink version 1.2, python3.7, data stream API
>
> 1. Do you have examples of providing a python customized class as a
> `result type`? Based on the documentation research, we found out only
> built-in types are supported in Python. Also, what is the payload size
> limitation inside the flink, do we have a recommendation for that?
>
> 2. Do you have examples of `flink run --python` data stream API codes to
> the cluster? We tried to do that, however the process hangs on a `socket
> read from the java gateway`, due to the lack of the missing logs, we are
> not sure what is missing while submitting the job.
>

>

>

>
> Regards
> Dc
>
>
> << {CH} {TS} Anything that can possibly go wrong, it does. >>
>
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, ProcessFunction


def datastream_processfunction_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, '1603708211000'),
   (2, '1603708224000'),
   (3, '1603708226000'),
   (4, '1603708289000')],
  type_info=Types.ROW([Types.INT(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])

class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
yield "current timestamp: {}, current watermark: {}, current_value: {}" \
.format(str(current_timestamp), str(current_watermark), str(value))

def on_timer(self, timestamp, ctx, out):
pass

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.process(MyProcessFunction(), output_type=Types.STRING()).print()
env.execute("Python DataStream Example")


if __name__ == '__main__':
datastream_processfunction_example()


Re: Dead code in ES Sink

2021-01-14 Thread Aljoscha Krettek

On 2021/01/13 07:50, Rex Fenley wrote:

Are you saying that this option does get passed along to Elasticsearch
still or that it's just arbitrarily validated? According to [1] it's been
deprecated in ES 6 and removed in ES 7.

[1] https://github.com/elastic/elasticsearch/pull/38085


Sorry, I wasn't being very clear. I meant that we just pass it on to ES.  
In light of it being deprecated, I think it makes sense to keep it as 
long as we support ES 6. What do you think?


Side note: we still have an ES 5 connector... 😅 There was a discussion 
about dropping it, but it wasn't conclusive. [1]


[1] 
https://lists.apache.org/thread.html/rb957e7d7d5fb9bbe25e5fbc56662749ee1bc551d36e26c58644f60d4%40%3Cdev.flink.apache.org%3E


Best,
Aljoscha


Re: Getting Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler

2021-01-14 Thread Arvid Heise
Hi Avi,

could you run
gradle dependencies
and report back to me?

Also did you ensure to run
gradle clean
before? The gradle version you are using is ancient, so I'm not sure if
it's picking up the change correctly.

On Thu, Jan 14, 2021 at 10:55 AM Avi Levi  wrote:

> No, I don't.  I actually tried also with 2.12.7 and got the same result
>
> On Thu, Jan 14, 2021 at 11:07 AM Arvid Heise  wrote:
>
>> Hi Avi,
>>
>> apparently the maximum version that Flink supports for scala is 2.12.7
>> [1]. Do you have a specific reason to use a higher version?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12461
>>
>> On Thu, Jan 14, 2021 at 5:11 AM Avi Levi  wrote:
>>
>>> Hi Arvid,
>>> Please find attached full build.gradle file
>>>
>>> On Tue, Jan 12, 2021 at 8:18 PM Arvid Heise  wrote:
>>>
 Can you post the full dependencies of sbt/maven/gradle whatever?

 On Tue, Jan 12, 2021 at 3:54 AM Avi Levi  wrote:

> Hi Arvid,
> using :
>
> flinkVersion = '1.12.0'
> scalaBinaryVersion = '2.12'
>
> I simplified the example to (same exception)  :
>
> object Flinktest extends App {
>   private val env = StreamExecutionEnvironment.getExecutionEnvironment
>   env.fromElements("A", "B","c")
> .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
> .process{new ProcessAllWindowFunction[String, List[String], 
> TimeWindow] {
>   override def process(context: Context, elements: Iterable[String], 
> out: Collector[List[String]]): Unit = {
> out.collect(elements.toList)
>   }
> }
> }
> .print()
>
> env.execute("Sample")
> }
>
>
>
>
> On Tue, Jan 5, 2021 at 1:53 PM Arvid Heise 
> wrote:
>
>> Hi Avi,
>>
>> without being a scala-guy, I'm guessing that you are mixing scala
>> versions. Could you check that your user code uses the same scala version
>> as Flink (1.11 or 1.12)? I have also heard of issues with different minor
>> versions of scala, so make sure to use the exact same version (e.g.
>> 2.11.12).
>>
>> On Mon, Dec 28, 2020 at 3:54 PM Avi Levi  wrote:
>>
>>> I am trying to aggregate all records in a time window. This is my
>>> ProcessAllWindowFunction :
>>>
>>> case class SimpleAggregate(elms: List[String])
>>>
>>> class AggregateLogs extends ProcessAllWindowFunction[String, 
>>> SimpleAggregate, TimeWindow ] {
>>>   override def process(context: Context, elements: Iterable[String], 
>>> out: Collector[SimpleAggregate]): Unit = {
>>> val es: List[String] = elements.toList
>>> val record = SimpleAggregate(es)
>>> out.collect(record)
>>>   }
>>> }
>>>
>>> But I am getting this exception why ?
>>>
>>> Exception in thread "main" java.util.concurrent.ExecutionException:
>>> scala.tools.reflect.ToolBoxError: reflective compilation has failed: 
>>> cannot
>>> initialize the compiler due to java.lang.BootstrapMethodError:
>>> java.lang.NoSuchMethodError:
>>> scala.collection.immutable.List.$anonfun$flatMap$1$adapted(Lscala/runtime/BooleanRef;Lscala/runtime/ObjectRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)Ljava/lang/Object;
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>>> at
>>> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>>> at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
>>> at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
>>> at
>>> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
>>> at
>>> com.neosec.handlefinancial.HandleFinancialJob$

Re: Re: Re: Using Kafka as bounded source with DataStream API in batch mode (Flink 1.12)

2021-01-14 Thread sagar
Thanks Yun



On Thu, Jan 14, 2021 at 1:58 PM Yun Gao  wrote:

> Hi Sagar,
>
>   I rechecked and found that the new kafka source is not formally publish
> yet, and a stable method I think may be try adding the FlinkKafkaConsumer
> as a BOUNDED source first. Sorry for the inconvient.
>
> Best,
>  Yun
>
> --
> Sender:Yun Gao
> Date:2021/01/14 15:26:54
> Recipient:Ardhani Narasimha; sagar<
> sagarban...@gmail.com>
> Cc:Flink User Mail List
> Theme:Re: Re: Using Kafka as bounded source with DataStream API in batch
> mode (Flink 1.12)
>
> Hi Sagar,
>
>   I think the problem is that the legacy source implemented by
> extending SourceFunction are all defined as CONTINOUS_UNBOUNDED when use
> env.addSource(). Although there is hacky way to add the legacy sources as
> BOUNDED source [1], I think you may first have a try of new version of
> KafkaSource [2] ? The new version of KafkaSource is implemented with the
> new Source API [3], which provides unfied support for the streaming and
> batch mode.
>
> Best,
>  Yun
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/BatchExecutionFileSinkITCase.java#L64
> [2]
> https://github.com/apache/flink/blob/fc00492654a3707dc5ad54fc3dd33453bb568ed1/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java#L69
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>
>
> --Original Mail --
> *Sender:*Ardhani Narasimha 
> *Send Date:*Thu Jan 14 15:11:35 2021
> *Recipients:*sagar 
> *CC:*Flink User Mail List 
> *Subject:*Re: Using Kafka as bounded source with DataStream API in batch
> mode (Flink 1.12)
>
>> Interesting use case.
>>
>> Can you please elaborate more on this.
>> On what criteria do you want to batch? Time? Count? Or Size?
>>
>> On Thu, 14 Jan 2021 at 12:15 PM, sagar  wrote:
>>
>>> Hi Team,
>>>
>>> I am getting the following error while running DataStream API in with
>>> batch mode with kafka source.
>>> I am using FlinkKafkaConsumer to consume the data.
>>>
>>> Caused by: java.lang.IllegalStateException: Detected an UNBOUNDED source
>>> with the 'execution.runtime-mode' set to 'BATCH'. This combination is not
>>> allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
>>> at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:198)
>>> ~[flink-core-1.12.0.jar:1.12.0]
>>>
>>> In my batch program I wanted to work with four to five different stream
>>> in batch mode as data source is bounded
>>>
>>> I don't find any clear example of how to do it with kafka souce with
>>> Flink 1.12
>>>
>>> I don't want to use JDBC source as underlying database table may change.
>>> please give me some example on how to achieve the above use case.
>>>
>>> Also for any large bounded source are there any alternatives to
>>> achieve this?
>>>
>>>
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>
>>
>> ---
>> *IMPORTANT*: The contents of this email and any attachments are
>> confidential and protected by applicable laws. If you have received this
>> email by mistake, please (i) notify the sender immediately; (ii) delete it
>> from your database; and (iii) do not disclose the contents to anyone or
>> make copies thereof. Razorpay accepts no liability caused due to any
>> inadvertent/ unintentional data transmitted through this email.
>>
>> ---
>>
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im
trying this:

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
 [Types.STRING(), Types.STRING(),
Types.LONG() ])

class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx:
'KeyedProcessFunction.Context', out: Collector):
result = {"id": ctx.get_current_key()[0], "data":
"some_string", "timestamp": }
out.collect(result)
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx:
'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
logging.info(timestamp)
out.collect("On timer timestamp: " + str(timestamp))

ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(),
Types.STRING()])) \
   .process(MyProcessFunction(), output_type=output_type_info)


I just hardcoded the values in MyProcessFunction to be sure that the input
data doesnt mess the fields. So the data is correct but PyFlink trews an
exception:

at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at
> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
> at
> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
> ... 10 more

However it works with primitive types like Types.STRING(). According
to the documentation the ROW type corresponds to the python's dict
type.


Regards


Pyflink Join with versioned view / table

2021-01-14 Thread Barth, Torben
Dear List,

I have trouble implementing a join between two streaming tables in Python Table 
API.

The left table of my join should be enriched with the information with the last 
value of the right_table. The right_table is updated only rarely (maybe after 
15 minutes). When implementing the join I get only updates when the right table 
changes. I want to trigger the updates for the joined table every time when I 
receive a record on the left side. The record should be enriched with the most 
recent result of the right side. I have not found a way to implement with the 
desired result.

It tried an implementation using a versioned view. Here is a short example:

left_table
root
|-- measurement_time: TIMESTAMP(3) *ROWTIME*
|-- x: DOUBLE
|-- y: DOUBLE
|-- proctime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()
|-- WATERMARK FOR measurement_time: TIMESTAMP(3) AS `measurement_time`

right_table
|-- some_value: INT
|-- id: INT
|-- modtime: TIMESTAMP(3) *ROWTIME*
 The "id" is always defined as 1.
 I perform the following operations

t_env.create_temporary_view("left_table", left_table.add_columns("1.cast(INT) 
AS left_artificial_key"))
t_env.create_temporary_view("right_table", right_table)

sql_view = """
-- Define a versioned view
CREATE VIEW versioned_right AS
SELECT id, some_value, modtime
  FROM (
 SELECT *,
 ROW_NUMBER() OVER (PARTITION BY id
ORDER BY modtime DESC) AS rownum
 FROM right_table)
WHERE rownum = 1
"""

view = t_env.execute_sql(sql_view)

sql = """
   SELECT
   left_table.*, versioned_right.some_value
FROM left_table
LEFT JOIN versioned_right FOR SYSTEM_TIME AS OF 
left_table.measurement_time
ON abt.left_artificial_key = versioned_right.id
"""

joined = t_env.sql_query(sql)


I observed the same behavior when using a lateral join.

Does anybody have an idea how the join could be implemented in the correct way?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang


Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Shuiqiang Chen
Hi meneldor,

The main cause of the error is that there is a bug in
`ctx.timer_service().current_watermark()`. At the beginning the stream,
when the first record come into the KeyedProcessFunction.process_element()
, the current_watermark will be the Long.MIN_VALUE at Java side, while at
the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.

>>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)

Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
automatically converted to a long interger in python but will cause Long
value overflow in Java when deserializing the registered timer value. I
will craete a issue to fix the bug.

Let’s return to your initial question, at PyFlink you could create a Row
Type data as bellow:

>>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)

And I wonder which release version of flink the code snippet you provided
based on? The latest API for KeyedProcessFunction.process_element() and
KeyedProcessFunction.on_timer() will not provid a `collector` to collect
output data but use `yield` which is a more pythonic approach.

Please refer to the following code:

def keyed_process_function_example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
data_stream = env.from_collection([(1, 'hello', '1603708211000'),
   (2, 'hi', '1603708224000'),
   (3, 'hello', '1603708226000'),
   (4, 'hi', '1603708289000')],

type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()]))

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[2])

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
yield Row(id=ctx.get_current_key()[1], data='some_string',
timestamp=)
# current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(ctx.timestamp()
+ 1500)

def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
yield Row(id=ctx.get_current_key()[1], data='current on
timer timestamp: ' + str(timestamp),
  timestamp=timestamp)

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'],
[Types.STRING(), Types.STRING(), Types.INT()])
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: (x[0], x[1]),
key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
.process(MyProcessFunction(), output_type=output_type_info).print()
env.execute('test keyed process function')


Best,
Shuiqiang





meneldor  于2021年1月14日周四 下午10:45写道:

> Hello,
>
> What is the correct way to use Python dict's as ROW type in pyflink? Im
> trying this:
>
> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>  [Types.STRING(), Types.STRING(), 
> Types.LONG() ])
>
> class MyProcessFunction(KeyedProcessFunction):
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
> out: Collector):
> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
> "timestamp": }
> out.collect(result)
> current_watermark = ctx.timer_service().current_watermark()
> ctx.timer_service().register_event_time_timer(current_watermark + 
> 1500)
>
> def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', 
> out: 'Collector'):
> logging.info(timestamp)
> out.collect("On timer timestamp: " + str(timestamp))
>
> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
> Types.STRING()])) \
>.process(MyProcessFunction(), output_type=output_type_info)
>
>
> I just hardcoded the values in MyProcessFunction to be sure that the input
> data doesnt mess the fields. So the data is correct but PyFlink trews an
> exception:
>
> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>> at
>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.streaming.api.operators.py

Re: Elasticsearch config maxes can not be disabled

2021-01-14 Thread Rex Fenley
Flink 1.11.2

CREATE TABLE sink_es (
...
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '${sys:proxyEnv.ELASTICSEARCH_HOSTS}',
'index' = '${sys:graph.flink.index_name}',
'format' = 'json',
'sink.bulk-flush.max-actions' = '0',
'sink.bulk-flush.max-size' = '0',
'sink.bulk-flush.interval' = '1s',
'sink.bulk-flush.backoff.delay' = '1s',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)

On Thu, Jan 14, 2021 at 4:16 AM Dawid Wysakowicz 
wrote:

> Hi,
>
> First of all, what Flink versions are you using?
>
> You are right it is a mistake in the documentation of the
> sink.bulk-flush.max-actions. It should say: Can be set to '-1' to disable
> it. I created a ticket[1] to track that. And as far as I can tell and I
> quickly checked that it should work. As for the sink.bulk-flush.max-size
> you should be able to disable it with a value of '0'.
>
> Could you share with us how do you use the connector? Could you also share
> the full stack trace for the exception you're getting? Are you creating the
> table with a CREATE statement?
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-20979
> On 13/01/2021 20:10, Rex Fenley wrote:
>
> Hello,
>
> It doesn't seem like we can disable max actions and max size for
> Elasticsearch connector.
>
> Docs:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/elasticsearch.html#sink-bulk-flush-max-actions
> sink.bulk-flush.max-actions optional 1000 Integer Maximum number of
> buffered actions per bulk request. Can be set to '0' to disable it.
> sink.bulk-flush.max-size optional 2mb MemorySize Maximum size in memory
> of buffered actions per bulk request. Must be in MB granularity. Can be set
> to '0' to disable it.
> Reality:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Max number of buffered actions must be larger than
> 0.
>
> ES code looks like -1 is actually the value for disabling, but when I use
> -1:
> Caused by: java.lang.IllegalArgumentException: Could not parse value '-1'
> for key 'sink.bulk-flush.max-size'.
>
> How can I disable these two settings?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: error accessing S3 bucket 1.12

2021-01-14 Thread Dawid Wysakowicz
Hi Billy,

I think you might be hitting the same problem as described in this
thread[1]. Does your bucket meet all the name requirements as described
in here[2] (e.g. have an underscore)?

Best,

Dawid

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unable-to-set-S3-like-object-storage-for-state-backend-td28362.html

[2] https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html

On 13/01/2021 19:20, Billy Bain wrote:
> I'm trying to use readTextFile() to access files in S3. I have
> verified the s3 key and secret are clean and the s3 path is similar to
> com.somepath/somefile. (the names changed to protect the guilty)
>
> Any idea what I'm missing? 
>
> 2021-01-13 12:12:43,836 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
> [] - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path:
> s3://com.somepath/somefile
> 2021-01-13 12:12:43,843 DEBUG
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory [] -
> Creating S3 file system backed by Hadoop s3a file system
> 2021-01-13 12:12:43,844 DEBUG
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory [] - Loading
> Hadoop configuration for Hadoop s3a file system
> 2021-01-13 12:12:43,926 DEBUG
> org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader [] - Adding
> Flink config entry for s3.access-key as fs.s3a.access-key to Hadoop config
> 2021-01-13 12:12:43,926 DEBUG
> org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader [] - Adding
> Flink config entry for s3.secret-key as fs.s3a.secret-key to Hadoop config
> 2021-01-13 12:12:43,944 DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask [] - Invoking
> Split Reader: Custom File Source -> (Timestamps/Watermarks, Map ->
> Filter -> Sink: Unnamed) (1/1)#0
> 2021-01-13 12:12:43,944 DEBUG
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Creating operator state backend for
> TimestampsAndWatermarksOperator_1cf40e099136da16c66c61032de62905_(1/1)
> with empty state.
> 2021-01-13 12:12:43,946 DEBUG
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
> Creating operator state backend for
> StreamSink_d91236bbbed306c2379eac4982246f1f_(1/1) with empty state.
> 2021-01-13 12:12:43,955 DEBUG org.apache.hadoop.conf.Configuration []
> - Reloading 1 existing configurations
> 2021-01-13 12:12:43,961 DEBUG
> org.apache.flink.fs.s3hadoop.S3FileSystemFactory [] - Using scheme
> s3://com.somepath/somefile for s3a file system backing the S3 File System
> 2021-01-13 12:12:43,965 DEBUG
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction
> [] - Closed File Monitoring Source for path: s3://com.somepath/somefile.
> 2021-01-13 12:12:43,967 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Source: Custom File Source (1/1)#0
> (1d75ae07abbd65f296c55a61a400c59f) switched from RUNNING to FAILED.
> java.io .IOException: null uri host. This can be
> caused by unencoded / in the password string
> at
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:163)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:196)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> Caused by: java.lang.NullPointerException: null uri host. This can be
> caused by unencoded / in the password string
> at java.util.Objects.requireNonNull(Objects.java:246) ~[?:?]
> at
> org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:69)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234)
> ~[blob_p-e297dae3da73ba51c20f14193b5ae6e09694422a-293a7d95166eee9a9b2329b71764cf67:?]
> at
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystem

Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread meneldor
Thank you for the answer Shuiqiang!
Im using the last apache-flink version:

> Requirement already up-to-date: apache-flink in
> ./venv/lib/python3.7/site-packages (1.12.0)

however the method signature is using a collector:

[image: image.png]
 Im using the *setup-pyflink-virtual-env.sh* shell script from the
docs(which uses pip).

Regards

On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen  wrote:

> Hi meneldor,
>
> The main cause of the error is that there is a bug in
> `ctx.timer_service().current_watermark()`. At the beginning the stream,
> when the first record come into the KeyedProcessFunction.process_element()
> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>
> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>
> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
> automatically converted to a long interger in python but will cause Long
> value overflow in Java when deserializing the registered timer value. I
> will craete a issue to fix the bug.
>
> Let’s return to your initial question, at PyFlink you could create a Row
> Type data as bellow:
>
> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)
>
> And I wonder which release version of flink the code snippet you provided
> based on? The latest API for KeyedProcessFunction.process_element() and
> KeyedProcessFunction.on_timer() will not provid a `collector` to collect
> output data but use `yield` which is a more pythonic approach.
>
> Please refer to the following code:
>
> def keyed_process_function_example():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.get_config().set_auto_watermark_interval(2000)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>(2, 'hi', '1603708224000'),
>(3, 'hello', '1603708226000'),
>(4, 'hi', '1603708289000')],
>   type_info=Types.ROW([Types.INT(), 
> Types.STRING(), Types.STRING()]))
>
> class MyTimestampAssigner(TimestampAssigner):
>
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[2])
>
> class MyProcessFunction(KeyedProcessFunction):
>
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
> yield Row(id=ctx.get_current_key()[1], data='some_string', 
> timestamp=)
> # current_watermark = ctx.timer_service().current_watermark()
> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 
> 1500)
>
> def on_timer(self, timestamp: int, ctx: 
> 'KeyedProcessFunction.OnTimerContext'):
> yield Row(id=ctx.get_current_key()[1], data='current on timer 
> timestamp: ' + str(timestamp),
>   timestamp=timestamp)
>
> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
> [Types.STRING(), Types.STRING(), Types.INT()])
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(MyTimestampAssigner())
> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: (x[0], x[1]), 
> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
> .process(MyProcessFunction(), output_type=output_type_info).print()
> env.execute('test keyed process function')
>
>
> Best,
> Shuiqiang
>
>
>
>
>
> meneldor  于2021年1月14日周四 下午10:45写道:
>
>> Hello,
>>
>> What is the correct way to use Python dict's as ROW type in pyflink? Im
>> trying this:
>>
>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>  [Types.STRING(), Types.STRING(), 
>> Types.LONG() ])
>>
>> class MyProcessFunction(KeyedProcessFunction):
>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
>> out: Collector):
>> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>> "timestamp": }
>> out.collect(result)
>> current_watermark = ctx.timer_service().current_watermark()
>> ctx.timer_service().register_event_time_timer(current_watermark + 
>> 1500)
>>
>> def on_timer(self, timestamp, ctx: 
>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
>> logging.info(timestamp)
>> out.collect("On timer timestamp: " + str(timestamp))
>>
>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
>> Types.STRING()])) \
>>.process(MyProcessFunction(), output_type=output_type_info)
>>
>>
>> I just hardcoded the values in MyProcessFunction to be sure that the
>> input data doesnt mess the fields. So the data is correct but PyFlink trews
>> an exception:
>>
>> at java.io.Da

Re: Deterministic rescale for test

2021-01-14 Thread Jaffe, Julian
Martin,

You can use `.partitionCustom` and provide a partitioner if you want to control 
explicitly how elements are distributed to downstream tasks.

From: Martin Frank Hansen 
Reply-To: "m...@berlingskemedia.dk" 
Date: Thursday, January 14, 2021 at 1:48 AM
To: user 
Subject: Deterministic rescale for test

Hi,

I am trying to make a test-suite for our Flink jobs, and are having problems 
making the input-data deterministic.

We are reading a file-input with parallelism 1 and want to rescale to a higher 
parallelism, such that the ordering of the data is the same every time.

I have tried using rebalance, rescale but it seems to randomly distribute data 
between partitions. We don't need something optimized, we just need the same 
distribution for every run.
Is this possible?

Some code:

val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(parallelism)
val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)

rawStream.rescale

...
best regards

--

Martin Frank Hansen




Re: Enrich stream with SQL api

2021-01-14 Thread Dawid Wysakowicz
Hi Marek,

I am afraid I don't have a good answer for your question. The problem
indeed is that the JDBC source can work only as a bounded source. As you
correctly pointed out, as of now mixing bounded with unbounded sources
does not work with checkpointing, which we want to address in the
FLIP-147 (that you linked as well).

I agree one solution would be to change the implementation of
JDBCDynamicTableSource so that it produces an UNBOUNDED source.
Unfortunately it is not the most straightforward task.

Another solution would be to actually use a CDC. I think you could use
one of the connectors from here[1], which use the embedded Debezium
engine, therefore you would not need to setup any external tools, but
just embed the CDC in FLINK. Ofc, if I am not mistaken here, as I
haven't tried those connectors myself.

Unfortunately I don't have any other ideas right now. Maybe someone else
can chime in @Timo @Jark

Lastly, I think once you solve the problem of a finishing source you
could also consider using the temporal join[2] instead of an interval join.

Best,

Dawid

[1] https://github.com/ververica/flink-cdc-connectors

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#temporal-joins

On 12/01/2021 16:40, Marek Maj wrote:
> Hello,
> I am trying to use Flink SQL api to join two tables. My stream data
> source is kafka (defined through catalog and schema registry) and my
> enrichment data is located in relational database (JDBC connector). I
> think this setup reflects quite common use case
>
> Enrichment table definition looks like this:
> CREATE TABLE dim (
>   ID BIGINT,
>   ENRICH STRING,
>   FROM_DATE TIMESTAMP(6),
>   TO_DATE TIMESTAMP(6),
>   WATERMARK FOR TO_DATE AS TO_DATE
> ) WITH (
>    'connector' = 'jdbc',
>    'url' = ‘…’,
>    'table-name' = ‘…’,
>    'username' = ‘…’,
>    'password' = ‘…’
> )
>
> And this is join I use against stream coming from kafka (table with
> watermark spec), trunc is udf:
> SELECT TRUNC(T1.START_TIME,'HH24') as `START_TIME`,
> D1.ENRICH as `ENRICH`,
> T1.FIELD as `FIELD`,
> FROM `kafka.topic` T1, dim D1
> WHERE T1.ENRICH_ID = D1.ID 
> AND T1.START_TIME between D1.TO_DATE - INTERVAL ‘1’ DAY AND D1.TO_DATE
> AND T1.START_TIME >= D1.FROM_DATE
>
> Result job graph contains two table source scan operators together
> with interval join operator.
>  
> The problem I am trying to solve is how to change the character of
> enrichment table. Currently, related operator task reads whole data
> from table when the job start and finishes afterwards. Ideally, I
> would like to have continuously updated enrichment table. Is it
> possible to achieve without CDC for example by querying whole database
> periodically or use some kind of cache for keys? We can assume that
> enrichment table is append only, there are no deletes or updates, only
> inserts for new time intervals
>
> If updates are not possible, how can I deal with finished task? Due to
> a known issue [1], all checkpoints are aborted . Maybe I could live
> with restarting job to get new enrichment data as it is not refreshed
> so frequently, but checkpointing is a must.
>
> flink version 1.12
>
> regards
> Marek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished


signature.asc
Description: OpenPGP digital signature


Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

2021-01-14 Thread Dan Hill
Hey, sorry for the late reply.  I'm using v1.11.1.

Cool.  I did a non-SQL way of using the first row.  I'll try to see if I
can do this in the SQL version.

On Wed, Jan 13, 2021 at 11:26 PM Jark Wu  wrote:

> Hi Dan,
>
> Sorry for the late reply.
>
> I guess you applied a "deduplication with keeping last row" before the
> interval join?
> That will produce an updating stream and interval join only supports
> append-only input.
> You can try to apply "deduplication with keeping *first* row" before the
> interval join.
> That should produce an append-only stream and interval join can consume
> from it.
>
> Best,
> Jark
>
>
>
> On Tue, 5 Jan 2021 at 20:07, Arvid Heise  wrote:
>
>> Hi Dan,
>>
>> Which Flink version are you using? I know that there has been quite a bit
>> of optimization of deduplication in 1.12, which would reduce the required
>> state tremendously.
>> I'm pulling in Jark who knows more.
>>
>> On Thu, Dec 31, 2020 at 6:54 AM Dan Hill  wrote:
>>
>>> Hi!
>>>
>>> I'm using Flink SQL to do an interval join.  Rows in one of the tables
>>> are not unique.  I'm fine using either the first or last row.  When I try
>>> to deduplicate
>>> 
>>>  and
>>> then interval join, I get the following error.
>>>
>>> IntervalJoin doesn't support consuming update and delete changes which
>>> is produced by node Rank(strategy=[UndefinedStrategy],
>>> rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1],
>>> partitionBy=[log_user_id], orderBy=[ts ASC], select=[platform_id, user_id,
>>> log_user_id, client_log_ts, event_api_ts, ts])
>>>
>>> Is there a way to combine these in this order?  I could do the
>>> deduplication afterwards but this will result in more state.
>>>
>>> - Dan
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Re: Declaring and using ROW data type in PyFlink DataStream

2021-01-14 Thread Xingbo Huang
Hi meneldor,

I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example.
The signature of the `process_element` method has been changed in the new
version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out
your results.

[1] https://issues.apache.org/jira/browse/FLINK-20647

Best,
Xingbo

meneldor  于2021年1月15日周五 上午1:20写道:

> Thank you for the answer Shuiqiang!
> Im using the last apache-flink version:
>
>> Requirement already up-to-date: apache-flink in
>> ./venv/lib/python3.7/site-packages (1.12.0)
>
> however the method signature is using a collector:
>
> [image: image.png]
>  Im using the *setup-pyflink-virtual-env.sh* shell script from the
> docs(which uses pip).
>
> Regards
>
> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen 
> wrote:
>
>> Hi meneldor,
>>
>> The main cause of the error is that there is a bug in
>> `ctx.timer_service().current_watermark()`. At the beginning the stream,
>> when the first record come into the KeyedProcessFunction.process_element()
>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>>
>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>>
>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
>> automatically converted to a long interger in python but will cause Long
>> value overflow in Java when deserializing the registered timer value. I
>> will craete a issue to fix the bug.
>>
>> Let’s return to your initial question, at PyFlink you could create a Row
>> Type data as bellow:
>>
>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=)
>>
>> And I wonder which release version of flink the code snippet you provided
>> based on? The latest API for KeyedProcessFunction.process_element() and
>> KeyedProcessFunction.on_timer() will not provid a `collector` to collect
>> output data but use `yield` which is a more pythonic approach.
>>
>> Please refer to the following code:
>>
>> def keyed_process_function_example():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.get_config().set_auto_watermark_interval(2000)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>>(2, 'hi', '1603708224000'),
>>(3, 'hello', '1603708226000'),
>>(4, 'hi', '1603708289000')],
>>   type_info=Types.ROW([Types.INT(), 
>> Types.STRING(), Types.STRING()]))
>>
>> class MyTimestampAssigner(TimestampAssigner):
>>
>> def extract_timestamp(self, value, record_timestamp) -> int:
>> return int(value[2])
>>
>> class MyProcessFunction(KeyedProcessFunction):
>>
>> def process_element(self, value, ctx: 
>> 'KeyedProcessFunction.Context'):
>> yield Row(id=ctx.get_current_key()[1], data='some_string', 
>> timestamp=)
>> # current_watermark = ctx.timer_service().current_watermark()
>> ctx.timer_service().register_event_time_timer(ctx.timestamp() + 
>> 1500)
>>
>> def on_timer(self, timestamp: int, ctx: 
>> 'KeyedProcessFunction.OnTimerContext'):
>> yield Row(id=ctx.get_current_key()[1], data='current on timer 
>> timestamp: ' + str(timestamp),
>>   timestamp=timestamp)
>>
>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
>> [Types.STRING(), Types.STRING(), Types.INT()])
>> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>> .with_timestamp_assigner(MyTimestampAssigner())
>> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>> .key_by(lambda x: (x[0], x[1]), 
>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
>> .process(MyProcessFunction(), output_type=output_type_info).print()
>> env.execute('test keyed process function')
>>
>>
>> Best,
>> Shuiqiang
>>
>>
>>
>>
>>
>> meneldor  于2021年1月14日周四 下午10:45写道:
>>
>>> Hello,
>>>
>>> What is the correct way to use Python dict's as ROW type in pyflink? Im
>>> trying this:
>>>
>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>>  [Types.STRING(), Types.STRING(), 
>>> Types.LONG() ])
>>>
>>> class MyProcessFunction(KeyedProcessFunction):
>>> def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
>>> out: Collector):
>>> result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>>> "timestamp": }
>>> out.collect(result)
>>> current_watermark = ctx.timer_service().current_watermark()
>>> ctx.timer_service().register_event_time_timer(current_watermark + 
>>> 1500)
>>>
>>> def on_timer(self, timestamp, ctx: 
>>> 'KeyedProcessFunction.OnTimerC

Gauges generating same graph

2021-01-14 Thread Manish G
Hi All,

I have few RichFlatMapFunction classes, and I have gauge added to each one
of  them. For a particular usecase I am updating these gauges
incrementally. I have a class member variable in each of these classes
which keeps increasing as flapMap function in these classes is called, and
then I update corresponding gauge with this updated value.

I observe that some of these gauges show almost exactly same graph in
grafana. Can it be due to this approach?


Re: Flink app logs to Elastic Search

2021-01-14 Thread bat man
I was able to make it work with a fresh Elastic installation. Now
taskmanager and jobmanager logs are available in elastic.
Thanks for the pointers.

-Hemant.

On Wed, Jan 13, 2021 at 6:21 PM Aljoscha Krettek 
wrote:

> On 2021/01/11 01:29, bat man wrote:
> >Yes, no entries to the elastic search. No indices were created in elastic.
> >Jar is getting picked up which I can see from yarn logs. Pre-defined text
> >based logging is also available.
>
> Hmm, I can't imagine much that could go wrong. Maybe there is some
> interference from other configuration files. Could you try and make sure
> that you only have the configuration and logging system in the classpath
> that you want to use?
>
> Best,
> Aljoscha
>


Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-14 Thread Yun Gao
  Hi all,

We have some offline discussion together with @Arvid, @Roman and @Aljoscha and 
I'd 
like to post some points we discussed:

1) For the problem that the "new" root task coincidently finished before 
getting triggered
successfully, we have listed two options in the FLIP-147[1], for the first 
version, now we are not tend
to go with the first option that JM would re-compute and re-trigger new sources 
when it realized
some tasks are not triggered successfully. This option would avoid the 
complexity of adding 
new PRC and duplicating task states, and in average case it would not cause too 
much 
overhead.

2) For how to support operators like Sink Committer to wait for one complete 
checkpoint 
before exit, it would be more an issue of how to use the checkpoints after 
tasks finished instead 
of how to achieve checkpoint after tasks finished, thus we would like to not 
include this part 
first in the current discussion. We would discuss and solve this issue 
separately after FLIP-147 is done.

Best,
 Yun


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
--
From:Yun Gao 
Send Time:2021 Jan. 13 (Wed.) 16:09
To:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

  Hi all,
I updated the FLIP[1] to reflect the major discussed points in the ML thread:

 1) For the "new" root tasks finished before it received trigger message, 
previously we proposed 
to let JM re-compute and re-trigger the descendant tasks, but after the 
discussion we realized that 
it might cause overhead to JobMaster on cascade finish and large parallelism 
cases. Another option
might be let the StreamTask do one synchronization with the 
CheckpointCoordinator before get finished 
to be aware of the missed pending checkpoints, since at then EndOfPartitions 
are not emitted yet, it
could still broadcast barriers to its descendant tasks. I updated the details 
in this section[2] in the
FLIP.

2) For the barrier alignment, now we change to insert faked barriers in the 
input channels to avoid
interference with checkpoint alignment algorithms. One remaining issue is that 
for unaligned checkpoint
mode we could not snapshot the upstream tasks' result partition if it have been 
finished. One option
to address this issue is to make the upstream tasks to wait for buffers get 
flushed before exit, and 
we would include this in the future versions. I updated this part in this 
section[3] in the FLIP.

3) Some operators like Sink Committer need to wait for one complete checkpoint 
before exit. To support
the operators that need to wait for some finalization condition like the Sink 
committer and Async I/O, we 
could introduce a new interface to mark this kind of operators, and let the 
runtime to wait till the operators
reached its condition. I updated this part in this section[4] in the FLIP.

Could you have another look of the FLIP and the pending issues ? Any feedbacks 
are warmly welcomed 
and appreciated. Very thanks!

Best,
 Yun

[1] https://cwiki.apache.org/confluence/x/mw-ZCQ
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-TriggeringCheckpointsAfterTasksFinished
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-ExtendthetaskBarrierAlignment
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-SupportOperatorsWaitingForCheckpointCompleteBeforeFinish


--
From:Yun Gao 
Send Time:2021 Jan. 12 (Tue.) 10:30
To:Khachatryan Roman 
Cc:dev ; user 
Subject:Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Roman, 

 Very thanks for the feedbacks and suggestions!

> I think UC will be the common case with multiple sources each with 
DoP > 1.
> IIUC, waiting for EoP will be needed on each subtask each time one of 
it's source subtask finishes.

Yes, waiting for EoP would be required for each input channel if we do 
not blocking the upstream
finished task specially. 

   > Yes, but checkpoint completion notification will not be sent until all 
the EOPs are processed.
  The downstream tasked get triggered indeed must wait for received EoPs 
from all the input channels,
I initially compared it with the completely aligned cases and now the remaining 
execution graph after the
trigger task could still taking normal unaligned checkpoint (like if A -> B -> 
C -> D, A get finished and B get 
triggered, then B -> C -> D could still taking normal unaligned checkpoint). 
But still it coul

Re: Deterministic rescale for test

2021-01-14 Thread Martin Frank Hansen
Hi Jaffe,

Thanks for your reply, I will try to use a Custom Partioner.

Den tor. 14. jan. 2021 kl. 19.39 skrev Jaffe, Julian <
julianja...@activision.com>:

> Martin,
>
>
>
> You can use `.partitionCustom` and provide a partitioner if you want to
> control explicitly how elements are distributed to downstream tasks.
>
>
>
> *From: *Martin Frank Hansen 
> *Reply-To: *"m...@berlingskemedia.dk" 
> *Date: *Thursday, January 14, 2021 at 1:48 AM
> *To: *user 
> *Subject: *Deterministic rescale for test
>
>
>
> Hi,
>
> I am trying to make a test-suite for our Flink jobs, and are having
> problems making the input-data deterministic.
>
> We are reading a file-input with parallelism 1 and want to rescale to a
> higher parallelism, such that the ordering of the data is the same every
> time.
>
> I have tried using rebalance, rescale but it seems to randomly distribute
> data between partitions. We don't need something optimized, we just need
> the same distribution for every run.
>
> Is this possible?
>
>
> Some code:
>
> val env: StreamExecutionEnvironment = StreamExecutionEnvironment.
> *getExecutionEnvironment*env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*)
> env.setParallelism(parallelism)
> val rawStream: DataStream[String] = env.readTextFile(s3Path).setParallelism(1)
>
> rawStream.rescale
>
> ...
>
> best regards
>
>
>
> --
>
> *Martin Frank Hansen*
>
>
>


-- 

Martin Frank Hansen


Re: Simplest way to deploy flink job on k8s for e2e testing purposes

2021-01-14 Thread Salva Alcántara
Can anyone explain why I am getting this error?

"Exception in thread "main" java.lang.IllegalStateException: No
ExecutorFactory found to execute the application."

I have tried a slightly different approach by running the jar that `sbt
assembly`produces inside a container that looks like this (Dockerfile):

```
FROM flink:1.11.2-scala_2.12-java11
COPY ./path/to/my.jar my.jar
```

I have tried with different versions of flink (1.10.2 for example) and I am
getting the same error. This should be pretty straightforward but I cannot
make it work. My main looks like this

```
object MyApp extends App {
  ...
  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  ...
  env.execute
}
```

and it fails when it reaches the call to `getExecutionEnvironment`...




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/