Hi, Which version of Flink are you using? This issue https://issues.apache.org/jira/browse/FLINK-10283 <https://issues.apache.org/jira/browse/FLINK-10283> shows that a similar problem was fixed in 1.6.1 and 1.7. If you use a newer version and still encounter the problem, you can reopen the issue and describe how this is still a problem for you.
Best, Stefan > On 22. Jan 2019, at 13:49, Chang Liu <fluency...@gmail.com> wrote: > > I have tried another way, it is not working as well: > > def evaluationStream( > indicatorsStream: DataStream[Indicators], > scenarios: Set[Scenario]): DataStream[Evaluation] = > indicatorsStream.map(new IndicatorsToTxEval(scenarios)) > > class IndicatorsToTxEval( > scenarios: Set[Scenario]) > extends MapFunction[Indicators, Evaluation] { > > override def map(inds: Indicators): Evaluation = > Evaluation(indicators.id <http://indicator.id/>, > evaluateScenarios(indicators, scenarios)) > } > > Best regards/祝好, > > Chang Liu 刘畅 > > >> On 22 Jan 2019, at 13:33, Chang Liu <fluency...@gmail.com >> <mailto:fluency...@gmail.com>> wrote: >> >> Ok, I think I found where is the issue, but I don’t understand why. >> >> I have a method: >> >> def evaluationStream( >> indicatorsStream: DataStream[Indicators], >> scenarios: Set[Scenario]): DataStream[Evaluation] = >> indicatorsStream.map { indicators => >> Evaluation(indicators.id <http://indicator.id/>, >> evaluateScenarios(indicators, scenarios)) >> } >> >> And this is how I am using it: >> >> lazy indicatorsStream: DataStream[Indicators] = ... >> >> lazy val scenarios: Set[Scenario] = loadScenarios(...) >> >> lazy val evalStream: DataStream[Evaluation] = >> evaluationStream(indicatorsStream, scenarios).print() >> >> >> The problem is caused by the scenarios, which is passed as an argument of >> the method evaluationStream. But is is not working. >> >> It will work if I do it in the following way: >> >> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...) >> >> def evaluationStream(indicatorsStream: DataStream[Indicators]): >> DataStream[Evaluation] = >> indicatorsStream.map { indicators => >> Evaluation(indicators.id <http://indicator.id/>, >> evaluateScenarios(indicators, scenarios)) >> } >> >> where the scenarios is not passed as a method argument but is a static >> object variable. >> >> But this is not what I want, I would like to have a configurable scenarios >> which I can load from config file instead of a static object variable. >> >> Any idea why this is happening? I also have other codes where I am also >> passing arguments and use them as part of my data flow and they are just >> working fine. >> >> Many Thanks. >> >> Best regards/祝好, >> >> Chang Liu 刘畅 >> >> >>> On 22 Jan 2019, at 10:47, Chang Liu <fluency...@gmail.com >>> <mailto:fluency...@gmail.com>> wrote: >>> >>> Dear community, >>> >>> I am having a problem releasing the job. >>> >>> 2019-01-22 10:42:50.098 WARN [Source: Custom Source -> Kafka -> >>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream >>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. >>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching >>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb >>> >>> I searched online but only found this: >>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles >>> >>> <https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles> >>> >>> However, this warnings are keeping popping up and the job cannot be >>> released so that my data flow is not working. >>> >>> But if I remove my last operator, it will work just fine. But my last >>> operator is justing doing some map operation. I am wondering what could be >>> the cause of this issue? >>> >>> Many Thanks :) >>> >>> Best regards/祝好, >>> >>> Chang Liu 刘畅 >>> >>> >> >