Hi,

Which version of Flink are you using? This issue 
https://issues.apache.org/jira/browse/FLINK-10283 
<https://issues.apache.org/jira/browse/FLINK-10283> shows that a similar 
problem was fixed in 1.6.1 and 1.7. If you use a newer version and still 
encounter the problem, you can reopen the issue and describe how this is still 
a problem for you.

Best,
Stefan 

> On 22. Jan 2019, at 13:49, Chang Liu <fluency...@gmail.com> wrote:
> 
> I have tried another way, it is not working as well:
> 
> def evaluationStream(
>     indicatorsStream: DataStream[Indicators],
>     scenarios: Set[Scenario]): DataStream[Evaluation] =
>   indicatorsStream.map(new IndicatorsToTxEval(scenarios))
> 
> class IndicatorsToTxEval(
>     scenarios: Set[Scenario])
>   extends MapFunction[Indicators, Evaluation] {
> 
>   override def map(inds: Indicators): Evaluation =
>     Evaluation(indicators.id <http://indicator.id/>, 
> evaluateScenarios(indicators, scenarios))
> }
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 22 Jan 2019, at 13:33, Chang Liu <fluency...@gmail.com 
>> <mailto:fluency...@gmail.com>> wrote:
>> 
>> Ok, I think I found where is the issue, but I don’t understand why.
>> 
>> I have a method:
>> 
>> def evaluationStream(
>>     indicatorsStream: DataStream[Indicators],
>>     scenarios: Set[Scenario]): DataStream[Evaluation] =
>>   indicatorsStream.map { indicators =>
>>     Evaluation(indicators.id <http://indicator.id/>, 
>> evaluateScenarios(indicators, scenarios))
>>   }
>> 
>> And this is how I am using it:
>> 
>> lazy indicatorsStream: DataStream[Indicators] = ...
>> 
>> lazy val scenarios: Set[Scenario] = loadScenarios(...)
>> 
>> lazy val evalStream: DataStream[Evaluation] = 
>> evaluationStream(indicatorsStream, scenarios).print()
>> 
>> 
>> The problem is caused by the scenarios, which is passed as an argument of 
>> the method evaluationStream. But is is not working.
>> 
>> It will work if I do it in the following way:
>> 
>> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
>> 
>> def evaluationStream(indicatorsStream: DataStream[Indicators]): 
>> DataStream[Evaluation] =
>>   indicatorsStream.map { indicators =>
>>     Evaluation(indicators.id <http://indicator.id/>, 
>> evaluateScenarios(indicators, scenarios))
>>   }
>> 
>> where the scenarios is not passed as a method argument but is a static 
>> object variable.
>> 
>> But this is not what I want, I would like to have a configurable scenarios 
>> which I can load from config file instead of a static object variable.
>> 
>> Any idea why this is happening? I also have other codes where I am also 
>> passing arguments and use them as part of my data flow and they are just 
>> working fine.
>> 
>> Many Thanks.
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
>>> On 22 Jan 2019, at 10:47, Chang Liu <fluency...@gmail.com 
>>> <mailto:fluency...@gmail.com>> wrote:
>>> 
>>> Dear community,
>>> 
>>> I am having a problem releasing the job.
>>> 
>>> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
>>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream 
>>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. 
>>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching 
>>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
>>>  
>>> I searched online but only found this: 
>>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>>>  
>>> <https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles>
>>> 
>>> However, this warnings are keeping popping up and the job cannot be 
>>> released so that my data flow is not working.
>>> 
>>> But if I remove my last operator, it will work just fine. But my last 
>>> operator is justing doing some map operation. I am wondering what could be 
>>> the cause of this issue?
>>> 
>>> Many Thanks :)
>>> 
>>> Best regards/祝好,
>>> 
>>> Chang Liu 刘畅
>>> 
>>> 
>> 
> 

Reply via email to