Hi all,
I am trying to use the slot group feature, by having 'default' group and 
additional 'market' group.
The purpose is to divide the resources equally between two sources and their 
following operators.
I've set the slotGroup on the source of the market data.
Can I assume that all following operators created from this source will use 
same slot group of 'market'?
(The operators created for market stream are pretty complex, with connect and 
split).
In Web UI I saw there are 16 slots, but didn't see indication per operator to 
which group it was assigned. How can I know?

Relevant Code:

env.setParallelism(8);
conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 16); \\ to allow 
Parallelism of 8 per group

// Market source and operators:

KeyedStream<SpotTickEvent, Tuple> windowedStreamA = sourceProvider.provide(env)
        .name(spotSourceProvider.getName())
        .slotSharingGroup(SourceMsgType.MARKET.slotGroup())
        .flatMap(new ParserMapper(new MarketMessageParser()))
        .name(ParserMapper.class.getSimpleName())
        .filter(new USDFilter())
        .name(USDFilter.class.getSimpleName())
        .keyBy(MarketEvent.CURRENCY_FIELD)
        .timeWindow(Time.of(windowSizeMs, TimeUnit.MILLISECONDS))
        .process(new LastInWindowPriceChangeFunction()))
        .name(LastInWindowPriceChangeFunction.class.getSimpleName())
        .keyBy(SpotTickEvent.CURRENCY_FIELD);


marketConnectedStream = windowedStreamA.connect(windowedStreamB)
            .flatMap(new MarketCoMapper()))
            .name(MarketCoMapper.class.getSimpleName())



SplitStream<MarketAWithMarketB> stocksWithSpotsStreams = marketConnectedStream
        .split( market -> ImmutableList.of("splitA"," splitB") );

DataStream< MarketAWithMarketB> splitA = stocksWithSpotsStreams.select("splitA 
");


Thanks and regards,
Tovi


Reply via email to