I have tried another way, it is not working as well: def evaluationStream( indicatorsStream: DataStream[Indicators], scenarios: Set[Scenario]): DataStream[Evaluation] = indicatorsStream.map(new IndicatorsToTxEval(scenarios))
class IndicatorsToTxEval( scenarios: Set[Scenario]) extends MapFunction[Indicators, Evaluation] { override def map(inds: Indicators): Evaluation = Evaluation(indicators.id <http://indicator.id/>, evaluateScenarios(indicators, scenarios)) } Best regards/祝好, Chang Liu 刘畅 > On 22 Jan 2019, at 13:33, Chang Liu <fluency...@gmail.com> wrote: > > Ok, I think I found where is the issue, but I don’t understand why. > > I have a method: > > def evaluationStream( > indicatorsStream: DataStream[Indicators], > scenarios: Set[Scenario]): DataStream[Evaluation] = > indicatorsStream.map { indicators => > Evaluation(indicators.id <http://indicator.id/>, > evaluateScenarios(indicators, scenarios)) > } > > And this is how I am using it: > > lazy indicatorsStream: DataStream[Indicators] = ... > > lazy val scenarios: Set[Scenario] = loadScenarios(...) > > lazy val evalStream: DataStream[Evaluation] = > evaluationStream(indicatorsStream, scenarios).print() > > > The problem is caused by the scenarios, which is passed as an argument of the > method evaluationStream. But is is not working. > > It will work if I do it in the following way: > > lazy val scenarios: Set[Scenario] = Set(S1, S2, ...) > > def evaluationStream(indicatorsStream: DataStream[Indicators]): > DataStream[Evaluation] = > indicatorsStream.map { indicators => > Evaluation(indicators.id <http://indicator.id/>, > evaluateScenarios(indicators, scenarios)) > } > > where the scenarios is not passed as a method argument but is a static object > variable. > > But this is not what I want, I would like to have a configurable scenarios > which I can load from config file instead of a static object variable. > > Any idea why this is happening? I also have other codes where I am also > passing arguments and use them as part of my data flow and they are just > working fine. > > Many Thanks. > > Best regards/祝好, > > Chang Liu 刘畅 > > >> On 22 Jan 2019, at 10:47, Chang Liu <fluency...@gmail.com >> <mailto:fluency...@gmail.com>> wrote: >> >> Dear community, >> >> I am having a problem releasing the job. >> >> 2019-01-22 10:42:50.098 WARN [Source: Custom Source -> Kafka -> >> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream >> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. >> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching >> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb >> >> I searched online but only found this: >> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles >> >> <https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles> >> >> However, this warnings are keeping popping up and the job cannot be released >> so that my data flow is not working. >> >> But if I remove my last operator, it will work just fine. But my last >> operator is justing doing some map operation. I am wondering what could be >> the cause of this issue? >> >> Many Thanks :) >> >> Best regards/祝好, >> >> Chang Liu 刘畅 >> >> >