Re: 取消订阅

2023-04-18 Thread Leonard Xu

可以发送任意内容的邮件到  user-unsubscr...@flink.apache.org   取消订阅来自 user@flink.apache.org  
邮件列表的邮件,其他邮件列表的订阅和退订管理也类似,邮件列表的订阅管理,可以参考[1]

祝好,
Leonard Xu
[1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

> On Apr 18, 2023, at 2:53 PM, wangw...@sina.cn wrote:
> 
> 取消订阅




Re: Task Failure Strategy for Adaptive Scheduler

2023-04-18 Thread David Morávek
> Our DAG has multiple sources which are not connected to each other.

To double-check, are you saying the job consists of multiple disjoint DAGs?

>  Do you think somehow the adaptive scheduler supports region pipelines
for streaming jobs ?

It's doable but might not be straightforward since the AS recycles
ExecutionGraph during restart. It has been a low priority so far because
it's mainly valuable for batch jobs, but we might reconsider it if there
are enough use cases.

Best,
D.

On Sat, Apr 15, 2023 at 8:15 AM Talat Uyarer 
wrote:

> Thanks David and others.
>
> Our DAG has multiple sources which are not connected to each other. If one
> of them fails, I believe Flink can restart a single region for
> defaultscheduler. but it is not the same case for adaptive scheduler. Do
> you think somehow the adaptive scheduler supports region pipelines for
> streaming jobs ? If it can, local recovery makes sense at that time I
> believe.
>
> Thanks
>
>
> On Wed, Apr 12, 2023 at 2:15 AM David Morávek 
> wrote:
>
>> Hi Talat,
>>
>> For most streaming pipelines, we have to restart the whole pipeline no
>> matter the scheduler used because they're a single pipelined region. One
>> limitation of AdaptiveScheduler is the lack of support for local recovery.
>> This will be addressed in Flink 1.18 [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21450
>> 
>>
>> Best,
>> D.
>>
>> On Tue, Apr 11, 2023 at 4:35 AM Weihua Hu  wrote:
>>
>>> Hi,
>>>
>>> AFAIK, the reactive mode always restarts the whole pipeline now.
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Tue, Apr 11, 2023 at 8:38 AM Talat Uyarer via user <
>>> user@flink.apache.org> wrote:
>>>
 Hi All,

 We use Flink 1.13 with reactive mode for our streaming jobs. When we
 have an issue/exception on our pipeline. Flink rescheduled all tasks. Is
 there any way to reschedule only task that had exceptions ?

 Thanks

>>>


Re: Opensearch connector and oauth

2023-04-18 Thread Martijn Visser
Hi Michael,

I'm looping in Andrey since he has worked a lot on the Opensearch
connector. A contribution is very welcome in case this can be improved.

Best regards,

Martijn



On Tue, Apr 18, 2023 at 8:45 AM Michael Hempel Jørgensen 
wrote:

> Hi,
>
> we need to use OAuth2 (Client Credentials Flow) in Flink to authenticate
> and authorise against different services, initially Kafka and Opensearch.
> We have it working with Kafka using however it doesn't seem to be possible
> with the Opensearch Flink Connector (
> https://github.com/apache/flink-connector-opensearch).
>
> It looks like the connector only supports basic auth (user/pass) at the
> moment, and doesn't have an option for manually setting custom headers
> either. Is this correct?
>
> Is there any work in progress towards making this possible?
>
> We are willing to help contribute to code/discussions if necessary to get
> this implemented somehow.
>
> Regards,
> Michael
>
> --
> Michael Hempel-Jørgensen
> Software Pilot
> www.trifork.com
>
>
>


Re: Opensearch connector and oauth

2023-04-18 Thread Andriy Redko
Hi Michael,
 
Indeed the Opensearch REST client supports only basic auth at the moment. I 
don't know which sink are you using, but the 
streaming/connectors/opensearch/OpensearchSink provides the capability to 
configure the underlying client using
RestClientFactory [1] so you could add the request / response interceptors to 
propagate OAuth2 tokens etc. This 
is not possible with other sinks yet but I think it should not be difficult to 
implement. If this is the case 
for you, could you please open the JIRA ticket? 

Thank you.


[1] 
https://github.com/apache/flink-connector-opensearch/blob/main/flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/RestClientFactory.java

Best Regards,
Andriy Redko

Tuesday, April 18, 2023, 3:47:22 AM, you wrote:

> Hi Michael,

> I'm looping in Andrey since he has worked a lot on the Opensearch connector. 
> A contribution is very welcome in case this can be improved. 

> Best regards,

> Martijn

> On Tue, Apr 18, 2023 at 8:45 AM Michael Hempel Jørgensen  
> wrote:

>> Hi,

>> we need to use OAuth2 (Client Credentials Flow) in Flink to authenticate and 
>> authorise against different services, initially Kafka and Opensearch. We 
>> have it working with Kafka using however it doesn't seem to be possible with 
>> the Opensearch Flink Connector 
>> (https://github.com/apache/flink-connector-opensearch).

>> It looks like the connector only supports basic auth (user/pass) at the 
>> moment, and doesn't have an option for manually setting custom headers 
>> either. Is this correct?

>> Is there any work in progress towards making this possible?

>> We are willing to help contribute to code/discussions if necessary to get 
>> this implemented somehow.

>> Regards,
>> Michael



Re: Task Failure Strategy for Adaptive Scheduler

2023-04-18 Thread Talat Uyarer via user
Hi David,

Yes We have multiple disjoint DAGs in one job. We want better CPU
utilization.  Open Source Flink has a scheduling issue with those types of
jobs. I made a fix on 1.13 with AS.  Now we are scheduling evenly for all
DAGs. However somehow when we get an exception on a DAG we dont want to
affect others and restart only whichever gets the exception. I believe the
Region Pipelined model is good for what the Default scheduler has.

Do you have anything in your mind that addresses restarting other than
Regioned Pipelines ?

Thanks



On Tue, Apr 18, 2023 at 12:19 AM David Morávek 
wrote:

> > Our DAG has multiple sources which are not connected to each other.
>
> To double-check, are you saying the job consists of multiple disjoint DAGs?
>
> >  Do you think somehow the adaptive scheduler supports region pipelines
> for streaming jobs ?
>
> It's doable but might not be straightforward since the AS recycles
> ExecutionGraph during restart. It has been a low priority so far because
> it's mainly valuable for batch jobs, but we might reconsider it if there
> are enough use cases.
>
> Best,
> D.
>
> On Sat, Apr 15, 2023 at 8:15 AM Talat Uyarer 
> wrote:
>
>> Thanks David and others.
>>
>> Our DAG has multiple sources which are not connected to each other. If
>> one of them fails, I believe Flink can restart a single region for
>> defaultscheduler. but it is not the same case for adaptive scheduler. Do
>> you think somehow the adaptive scheduler supports region pipelines for
>> streaming jobs ? If it can, local recovery makes sense at that time I
>> believe.
>>
>> Thanks
>>
>>
>> On Wed, Apr 12, 2023 at 2:15 AM David Morávek 
>> wrote:
>>
>>> Hi Talat,
>>>
>>> For most streaming pipelines, we have to restart the whole pipeline no
>>> matter the scheduler used because they're a single pipelined region. One
>>> limitation of AdaptiveScheduler is the lack of support for local recovery.
>>> This will be addressed in Flink 1.18 [1].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-21450
>>> 
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Apr 11, 2023 at 4:35 AM Weihua Hu 
>>> wrote:
>>>
 Hi,

 AFAIK, the reactive mode always restarts the whole pipeline now.

 Best,
 Weihua


 On Tue, Apr 11, 2023 at 8:38 AM Talat Uyarer via user <
 user@flink.apache.org> wrote:

> Hi All,
>
> We use Flink 1.13 with reactive mode for our streaming jobs. When we
> have an issue/exception on our pipeline. Flink rescheduled all tasks. Is
> there any way to reschedule only task that had exceptions ?
>
> Thanks
>



Debug CEP Patterns

2023-04-18 Thread Ana Gómez González
Hello!

What's the best way of debugging a CEP pattern/stream?
Basically, I have a Flink (Scala) program that consumes events from
RabbitMQ. The source is working ok because I can see the inputStream
printed. Then, I defined one pattern, created a patternStream, and finally
an output stream with a created PatternProcessFunction.

I want to make sure my pattern is defined correctly and the same with my
process function, because so far it seems I'm not getting any element in my
output stream and I'm manually inserting events in the source that might
cause a match. Is there a way I can debug this using IntelliJ (I set some
breakpoints but ofc doesn't work) or with any other method? I used also
some classical System.out.println with no success.
Can some tests be written and later performed to check the expected
behavior?

I'm leaving here most of my code in case you detect some structural or
conceptual error (for example, before this version I was not setting the
time characteristics for the stream so it didn't work...)

package org.angoglez
import dto.events.{HourlyReading, OutputEvent}
import dto.schemas.GenericJsonDeserializationSchema
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,
WatermarkStrategy}
import org.apache.flink.api.scala._
import org.apache.flink.cep.scala.CEP
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.rabbitmq.common.
RMQConnectionConfig
import patterns.WaterPatterns
import patterns.functions.HourlyReadingProcessFunction
import source.RabbitMQSource

import java.time.{Duration, Instant, ZoneId, ZoneOffset}

object MainRunner {
def main(args: Array[String]): Unit = {

//ENVIRONMENT, CONFIG, RABBIT SOURCE, CREATION OF DATASTREAM
val conf = new Configuration()
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.enableCheckpointing(100)

val connectionConfig: RMQConnectionConfig = new RMQConnectionConfig.Builder
()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("")
.setUserName("")
.setPassword("")
.build

type eventType = HourlyReading
//Not gonna include the DeserializationSchema here, it works properly
'cause I can see it printed.
val deserializationSchema = new GenericJsonDeserializationSchema[eventType]

val rabbitMQSource = new RabbitMQSource[eventType](connectionConfig,
"tests-flink", deserializationSchema)

val serializableTimestampAssigner = new
SerializableTimestampAssigner[eventType]
{
def extractTimestamp(element: eventType, recordTimestamp: Long): Long =
element.obtainTimestamp.toInstant(ZoneId.systemDefault.getRules.getOffset(
Instant.now())).toEpochMilli
}

val inputStream: DataStream[eventType] = env
.addSource(rabbitMQSource.obtainRabbitSource) // same as above, the
RMQSource works well, not added here.
.assignTimestampsAndWatermarks {
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(serializableTimestampAssigner)
}


inputStream.print() // printed ok

// PATTERN DEFINITION AND

val pattern =
Pattern
.begin[HourlyReading]("a1-nighttimeflowfraud")
.where(event => FunctionUtils.isBetweenHoursRange(event.dateTime, 1, 6))
.next("a2-nighttimeflowfraud")
.where(event => FunctionUtils.isBetweenHoursRange(event.dateTime, 1, 6))
.where { (event: HourlyReading, context: Context[HourlyReading]) =>
val previousPattern = context.getEventsForPattern("a1-nighttimeflowfraud"
).toList.head
(previousPattern.serialNumber == event.serialNumber) &&
(previousPattern.unit == "M3" && event.unit == "M3") && (event.volume -
previousPattern.volume < 2 ||
event.volume - previousPattern.volume > 3)
}
val patternStream = CEP.pattern(inputStream, pattern)

val outputStream: DataStream[OutputEvent] = patternStream.process(new
HourlyReadingProcessFunction) //below

outputStream.print() // nothing here

env.execute
}
}


 PATTERN PROCESS FUNCTION CLASS
///

class HourlyReadingProcessFunction extends PatternProcessFunction[
HourlyReading, OutputEvent] {
def processMatch(
patterns: util.Map[String, util.List[HourlyReading]],
ctx: PatternProcessFunction.Context,
out: Collector[OutputEvent]
): Unit = {

System.out.println("PROCESSING MATCH")

val pattern1: HourlyReading = patterns.get("a1-nighttimeflowfraud").get(0)
val pattern2: HourlyReading = patterns.get("a2-nighttimeflowfraud").get(0)

System.out.println(pattern1)
System.out.println(pattern2)
//case class OutputEvent(str: String)
out.collect(OutputEvent(s"Event observed: ${pattern1.dateTime.toString} - ${
pattern2.dateTime.toString}"))
}
}





Thanks a lot in advance!!


*Ana Gómez González*

 


Re: Debug CEP Patterns

2023-04-18 Thread Ana Gómez González
Here's a link for the previous code in a gist so you don't struggle with
the format. Sorry.

https://gist.github.com/angoglez/d9eb6e12f259aba306387b5c23488fb4

>


Flink not releasing the reference to a deleted jar file

2023-04-18 Thread neha goyal
Hello,

I am attaching a sample code and screenshot where Flink is holding the
reference to a jar file even after I close the streamExecutionEnvironment.

Due to this, the deleted file is not getting cleaned up from the disk and
we are getting disc space alerts. When we restart our application then
these files get cleared from the disk.
What is the way to gracefully shut down the Flink environment so that it
releases all the resources' references?

public class TestResourceRelease {

public void check(){
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
try{
StreamTableEnvironment env = StreamTableEnvironment.create(execEnv);
env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate
AS 'com.my.udf.v2.EpochToDate' USING JAR
'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
TypeInformation[] typeInformationArray = getTypeInfoArray();
String[] columnName = new String[]{"x", "y"};
KafkaSource source =
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new
JsonRowDeserializationSchema(Types.ROW_NAMED(columnName,
typeInformationArray)))
.setProperty("bootstrap.servers", "localhost:9092")
.setTopics("test").build();

DataStream stream = execEnv.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
env.registerDataStream("test", stream);

Table table = env.fromDataStream(stream);
env.registerTable("my_test", table);
Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS
`order_time`, `y` FROM `my_test`");
System.out.println("created the table");
}
catch (Exception e){
System.out.println(e);

}
finally {
try {
execEnv.close();
} catch (Exception e) {
e.printStackTrace();
}
File file = new
File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
file.delete();
}

}

public static TypeInformation[] getTypeInfoArray(){
TypeInformation[] typeInformations = new TypeInformation[2];
typeInformations[0] = org.apache.flink.table.api.Types.LONG();
typeInformations[1] = org.apache.flink.table.api.Types.LONG();
return typeInformations;
}

}


Re: Opensearch connector and oauth

2023-04-18 Thread Michael Hempel Jørgensen
Hi Andriy,


we are currently use the OpensearchSink[1] connector, as we assumed that that 
was the one to use going forwards. We are not quite sure what the difference is 
and if any will become deprecated soon?


Regards,

Michael


[1] 
https://github.com/apache/flink-connector-opensearch/blob/main/flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchSink.java




From: Andriy Redko 
Sent: Tuesday, 18 April 2023 4:16:45 PM
To: Martijn Visser; Michael Hempel Jørgensen
Cc: user@flink.apache.org
Subject: Re: Opensearch connector and oauth

Hi Michael,

Indeed the Opensearch REST client supports only basic auth at the moment. I 
don't know which sink are you using, but the
streaming/connectors/opensearch/OpensearchSink provides the capability to 
configure the underlying client using
RestClientFactory [1] so you could add the request / response interceptors to 
propagate OAuth2 tokens etc. This
is not possible with other sinks yet but I think it should not be difficult to 
implement. If this is the case
for you, could you please open the JIRA ticket?

Thank you.


[1] 
https://github.com/apache/flink-connector-opensearch/blob/main/flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/RestClientFactory.java

Best Regards,
Andriy Redko

Tuesday, April 18, 2023, 3:47:22 AM, you wrote:

> Hi Michael,

> I'm looping in Andrey since he has worked a lot on the Opensearch connector. 
> A contribution is very welcome in case this can be improved.

> Best regards,

> Martijn

> On Tue, Apr 18, 2023 at 8:45 AM Michael Hempel Jørgensen  
> wrote:

>> Hi,

>> we need to use OAuth2 (Client Credentials Flow) in Flink to authenticate and 
>> authorise against different services, initially Kafka and Opensearch. We 
>> have it working with Kafka using however it doesn't seem to be possible with 
>> the Opensearch Flink Connector 
>> (https://github.com/apache/flink-connector-opensearch).

>> It looks like the connector only supports basic auth (user/pass) at the 
>> moment, and doesn't have an option for manually setting custom headers 
>> either. Is this correct?

>> Is there any work in progress towards making this possible?

>> We are willing to help contribute to code/discussions if necessary to get 
>> this implemented somehow.

>> Regards,
>> Michael



Re: Flink not releasing the reference to a deleted jar file

2023-04-18 Thread neha goyal
Adding to the above query, I have tried dropping the tables and the
function as well but no luck.

On Wed, Apr 19, 2023 at 11:01 AM neha goyal  wrote:

> Hello,
>
> I am attaching a sample code and screenshot where Flink is holding the
> reference to a jar file even after I close the streamExecutionEnvironment.
>
> Due to this, the deleted file is not getting cleaned up from the disk and
> we are getting disc space alerts. When we restart our application then
> these files get cleared from the disk.
> What is the way to gracefully shut down the Flink environment so that it
> releases all the resources' references?
>
> public class TestResourceRelease {
>
> public void check(){
> StreamExecutionEnvironment execEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> try{
> StreamTableEnvironment env = 
> StreamTableEnvironment.create(execEnv);
> env.executeSql("CREATE FUNCTION IF NOT EXISTS EpochToDate AS 
> 'com.my.udf.v2.EpochToDate' USING JAR 
> 'file:///tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar'");
> TypeInformation[] typeInformationArray = getTypeInfoArray();
> String[] columnName = new String[]{"x", "y"};
> KafkaSource source = 
> KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
> .setValueOnlyDeserializer(new 
> JsonRowDeserializationSchema(Types.ROW_NAMED(columnName, 
> typeInformationArray)))
> .setProperty("bootstrap.servers", "localhost:9092")
> .setTopics("test").build();
>
> DataStream stream = execEnv.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> env.registerDataStream("test", stream);
>
> Table table = env.fromDataStream(stream);
> env.registerTable("my_test", table);
> Table table1 = env.sqlQuery("SELECT `EpochToDate`(`x`) AS 
> `order_time`, `y` FROM `my_test`");
> System.out.println("created the table");
> }
> catch (Exception e){
> System.out.println(e);
>
> }
> finally {
> try {
> execEnv.close();
> } catch (Exception e) {
> e.printStackTrace();
> }
> File file = new 
> File("/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar");
> file.delete();
> }
>
> }
>
> public static TypeInformation[] getTypeInfoArray(){
> TypeInformation[] typeInformations = new TypeInformation[2];
> typeInformations[0] = org.apache.flink.table.api.Types.LONG();
> typeInformations[1] = org.apache.flink.table.api.Types.LONG();
> return typeInformations;
> }
>
> }
>
>


Re: Debug CEP Patterns

2023-04-18 Thread Biao Geng
Hi Ana,
Thanks for the codes. I just want to share my own experience when debugging
CEP patterns:
1. It should work when adding breakpoints in IntelliJ. But you should add
the breakpoint on the filter function like the call of isBetweenHoursRange
or the line65 in your gist. The reason is that the pattern object will be
built once, when no events entered as the env.execute is not called. But
the filter method in condition object of the pattern will be executed to
detect if the NFA should advance to next phase.
2. You can add some System.out.println in the condition so you may know
which condition is matched and which condition is not matched. It is
helpful in most cases.

For test writing, you can reference to codes
of org.apache.flink.cep.CEPITCase.
My final thought is that your line63 and 64 use 2  where clauses so these 2
condition will be joined with "and"(i.e. for a single event, it should
fulfill both of the 2 conditions). You can check if that is what you really
want.

Best,
Biao Geng

Ana Gómez González  于2023年4月19日周三 05:09写道:

> Here's a link for the previous code in a gist so you don't struggle with
> the format. Sorry.
>
> https://gist.github.com/angoglez/d9eb6e12f259aba306387b5c23488fb4
>
>>