I have tried another way, it is not working as well:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map(new IndicatorsToTxEval(scenarios))

class IndicatorsToTxEval(
    scenarios: Set[Scenario])
  extends MapFunction[Indicators, Evaluation] {

  override def map(inds: Indicators): Evaluation =
    Evaluation(indicators.id <http://indicator.id/>, 
evaluateScenarios(indicators, scenarios))
}

Best regards/祝好,

Chang Liu 刘畅


> On 22 Jan 2019, at 13:33, Chang Liu <fluency...@gmail.com> wrote:
> 
> Ok, I think I found where is the issue, but I don’t understand why.
> 
> I have a method:
> 
> def evaluationStream(
>     indicatorsStream: DataStream[Indicators],
>     scenarios: Set[Scenario]): DataStream[Evaluation] =
>   indicatorsStream.map { indicators =>
>     Evaluation(indicators.id <http://indicator.id/>, 
> evaluateScenarios(indicators, scenarios))
>   }
> 
> And this is how I am using it:
> 
> lazy indicatorsStream: DataStream[Indicators] = ...
> 
> lazy val scenarios: Set[Scenario] = loadScenarios(...)
> 
> lazy val evalStream: DataStream[Evaluation] = 
> evaluationStream(indicatorsStream, scenarios).print()
> 
> 
> The problem is caused by the scenarios, which is passed as an argument of the 
> method evaluationStream. But is is not working.
> 
> It will work if I do it in the following way:
> 
> lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)
> 
> def evaluationStream(indicatorsStream: DataStream[Indicators]): 
> DataStream[Evaluation] =
>   indicatorsStream.map { indicators =>
>     Evaluation(indicators.id <http://indicator.id/>, 
> evaluateScenarios(indicators, scenarios))
>   }
> 
> where the scenarios is not passed as a method argument but is a static object 
> variable.
> 
> But this is not what I want, I would like to have a configurable scenarios 
> which I can load from config file instead of a static object variable.
> 
> Any idea why this is happening? I also have other codes where I am also 
> passing arguments and use them as part of my data flow and they are just 
> working fine.
> 
> Many Thanks.
> 
> Best regards/祝好,
> 
> Chang Liu 刘畅
> 
> 
>> On 22 Jan 2019, at 10:47, Chang Liu <fluency...@gmail.com 
>> <mailto:fluency...@gmail.com>> wrote:
>> 
>> Dear community,
>> 
>> I am having a problem releasing the job.
>> 
>> 2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> 
>> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream 
>> -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. 
>> Out (2/4)] [FileCache] - improper use of releaseJob() without a matching 
>> number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
>>  
>> I searched online but only found this: 
>> https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles
>>  
>> <https://stackoverflow.com/questions/52135604/fix-improper-use-of-releasejob-without-a-matching-number-of-createtmpfiles>
>> 
>> However, this warnings are keeping popping up and the job cannot be released 
>> so that my data flow is not working.
>> 
>> But if I remove my last operator, it will work just fine. But my last 
>> operator is justing doing some map operation. I am wondering what could be 
>> the cause of this issue?
>> 
>> Many Thanks :)
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
>> 
>> 
> 

Reply via email to