Hi,

You can manually invoke IgniteDataStreamer.flush() method, if it finished
without exceptions, then all data was loaded. Or you can use
IgniteDataStreamer.close(false) to close this streamer without canceling
ongoing streams, it also will streams any remaining data before close.

Evgenii

2017-06-20 14:19 GMT+03:00 Alper Tekinalp <[email protected]>:

> Hi Evgenii.
>
> Is there way to know if data streamer finished all data transfer? We load
> caches through data streamers to our internal caches when starting our
> application and I want to be sure that all the data are loaded before
> starting application logic.
>
> On Fri, Apr 21, 2017 at 5:22 PM, Evgenii Zhuravlev <
> [email protected]> wrote:
>
>> You don't need to handle it, as it mentioned in exception message -
>> "DataStreamer will retry data transfer at stable topology ". It will retry
>> right after topology will be updated. If you don't have any other
>> exceptions - data loading finished successfully
>>
>> 2017-04-21 15:54 GMT+03:00 Alper Tekinalp <[email protected]>:
>>
>>> Hi,
>>>
>>> So how should I handle that exception? Should I just ignore it? Is
>>> there a way I can know when the data streamer successfully deliver
>>> after getting that error? I just want to know if data
>>> loading/rebalancing finished succesfully.
>>>
>>> On Fri, Apr 21, 2017 at 2:10 PM, Evgenii Zhuravlev
>>> <[email protected]> wrote:
>>> > Hi,
>>> >
>>> > Minor topology version changes when you creating or removing caches
>>> and at
>>> > late affinity.
>>> >
>>> > Yes, you see this error in log because minor topoly was changed  while
>>> > datastreamer was running, but you didn't lose any data.
>>> >
>>> > According to javadoc:
>>> >
>>> >>Note that there is no guarantee that data will be delivered after this
>>> >> concrete attempt (e.g., it can fail when topology is changing), but
>>> it won't
>>> >> be lost anyway.
>>> >
>>> > Evgenii
>>> >
>>> >
>>> > 2017-04-21 13:24 GMT+03:00 Alper Tekinalp <[email protected]>:
>>> >>
>>> >> Hi.
>>> >>
>>> >> First of all what is minor topology version and when does it chage?
>>> >>
>>> >> Below we got an error while loading data with data streamer:
>>> >>
>>> >> 18/Apr/2017 13:50:45   INFO   34391455 [exchange-worker-#31%null%]
>>> >> org.apache.ignite.internal.processors.cache.GridCacheProcessor(L:475)
>>> -
>>> >> Started cache [name=RECORD_CACHE_XX, mode=PARTITIONED]
>>> >> ..
>>> >> 18/Apr/2017 13:50:49   ERROR  34394884 [DeploymentWorker-0]
>>> >> com.intellica.evam.engine.dynamic.helpers.DeploymentHelper(L:359) -
>>> Scenario
>>> >> data could not be preloaded in distributed deployment for scenario
>>> [XX].
>>> >> javax.cache.CacheException: class
>>> >> org.apache.ignite.IgniteCheckedException: Failed to finish operation
>>> (too
>>> >> many remaps): 32
>>> >>     at
>>> >> org.apache.ignite.internal.processors.cache.GridCacheUtils.c
>>> onvertToCacheException(GridCacheUtils.java:1465)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.close(DataStreamerImpl.java:1160)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl.close(DataStreamerImpl.java:1244)
>>> >>     at
>>> >> com.intellica.evam.engine.db.dao.ScenarioRecordDao.preloadSc
>>> enarioRecords(ScenarioRecordDao.java:134)
>>> >>     at
>>> >> com.intellica.evam.engine.dynamic.helpers.cache.PreloadHelpe
>>> r.preloadScenarioRecordData(PreloadHelper.java:58)
>>> >>     at
>>> >> com.intellica.evam.engine.dynamic.helpers.cache.PreloadHelpe
>>> r.preloadScenarioData(PreloadHelper.java:24)
>>> >>     at
>>> >> com.intellica.evam.engine.dynamic.helpers.DeploymentHelper.d
>>> istributedDeploy(DeploymentHelper.java:827)
>>> >>     at
>>> >> com.intellica.evam.engine.dynamic.DeploymentWorker.handleDep
>>> loymentEvent(DeploymentWorker.java:83)
>>> >>     at
>>> >> com.intellica.evam.engine.dynamic.DeploymentWorker.run(Deplo
>>> ymentWorker.java:62)
>>> >> Caused by: class org.apache.ignite.IgniteCheckedException: Failed to
>>> >> finish operation (too many remaps): 32
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl$5.apply(DataStreamerImpl.java:863)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl$5.apply(DataStreamerImpl.java:828)
>>> >>     at
>>> >> org.apache.ignite.internal.util.future.GridFutureAdapter$Arr
>>> ayListener.apply(GridFutureAdapter.java:456)
>>> >>     at
>>> >> org.apache.ignite.internal.util.future.GridFutureAdapter$Arr
>>> ayListener.apply(GridFutureAdapter.java:439)
>>> >>     at
>>> >> org.apache.ignite.internal.util.future.GridFutureAdapter.not
>>> ifyListener(GridFutureAdapter.java:271)
>>> >>     at
>>> >> org.apache.ignite.internal.util.future.GridFutureAdapter.not
>>> ifyListeners(GridFutureAdapter.java:259)
>>> >>     at
>>> >> org.apache.ignite.internal.util.future.GridFutureAdapter.onD
>>> one(GridFutureAdapter.java:389)
>>> >>     at
>>> >> org.apache.ignite.internal.util.future.GridFutureAdapter.onD
>>> one(GridFutureAdapter.java:355)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl$Buffer.onResponse(DataStreamerImpl.java:1789)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl$3.onMessage(DataStreamerImpl.java:335)
>>> >>     at
>>> >> org.apache.ignite.internal.managers.communication.GridIoMana
>>> ger.invokeListener(GridIoManager.java:1215)
>>> >>     at
>>> >> org.apache.ignite.internal.managers.communication.GridIoMana
>>> ger.processRegularMessage0(GridIoManager.java:843)
>>> >>     at
>>> >> org.apache.ignite.internal.managers.communication.GridIoMana
>>> ger.access$2100(GridIoManager.java:108)
>>> >>     at
>>> >> org.apache.ignite.internal.managers.communication.GridIoMana
>>> ger$6.run(GridIoManager.java:783)
>>> >>     at
>>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1145)
>>> >>     at
>>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:615)
>>> >>     at java.lang.Thread.run(Thread.java:745)
>>> >> Caused by: class org.apache.ignite.IgniteCheckedException:
>>> DataStreamer
>>> >> request failed [node=6fe302a3-f353-4235-af5a-3708bf240750]
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> merImpl$Buffer.onResponse(DataStreamerImpl.java:1777)
>>> >>     ... 8 more
>>> >> Caused by: class org.apache.ignite.IgniteCheckedException:
>>> DataStreamer
>>> >> will retry data transfer at stable topology
>>> [reqTop=AffinityTopologyVersion
>>> >> [topVer=11, minorTopVer=14], topVer=AffinityTopologyVersion
>>> [topVer=11,
>>> >> minorTopVer=15], node=remote]
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> mProcessor.localUpdate(DataStreamProcessor.java:339)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> mProcessor.processRequest(DataStreamProcessor.java:297)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> mProcessor.access$000(DataStreamProcessor.java:56)
>>> >>     at
>>> >> org.apache.ignite.internal.processors.datastreamer.DataStrea
>>> mProcessor$1.onMessage(DataStreamProcessor.java:86)
>>> >>     ... 7 more
>>> >> 18/Apr/2017 13:50:49   INFO   34394950 [exchange-worker-#31%null%]
>>> >> org.apache.ignite.internal.processors.cache.GridCacheProcessor(L:475)
>>> -
>>> >> Started cache [name=FEC_XX, mode=PARTITIONED]
>>> >>
>>> >>
>>> >> I guess the reason seems that when one of the server nodes loads data
>>> >> through data streamer to cache RECORD_CACHE_XX, other server creates
>>> cache
>>> >> FEC_XX. Bacause cache creation increases the minor topology version.
>>> >>
>>> >> Is my understanding true?
>>> >> If it is what is the reason?
>>> >>
>>> >> --
>>> >> Alper Tekinalp
>>> >>
>>> >> Software Developer
>>> >> Evam Streaming Analytics
>>> >>
>>> >> Atatürk Mah. Turgut Özal Bulv.
>>> >> Gardenya 5 Plaza K:6 Ataşehir
>>> >> 34758 İSTANBUL
>>> >>
>>> >> Tel:  +90 216 455 01 53 <+90%20216%20455%2001%2053> Fax: +90 216 455
>>> 01 54 <+90%20216%20455%2001%2054>
>>> >> www.evam.com.tr
>>> >
>>> >
>>>
>>>
>>>
>>> --
>>> Alper Tekinalp
>>>
>>> Software Developer
>>> Evam Streaming Analytics
>>>
>>> Atatürk Mah. Turgut Özal Bulv.
>>> Gardenya 5 Plaza K:6 Ataşehir
>>> 34758 İSTANBUL
>>>
>>> Tel:  +90 216 455 01 53 <+90%20216%20455%2001%2053> Fax: +90 216 455 01
>>> 54 <+90%20216%20455%2001%2054>
>>> www.evam.com.tr
>>>
>>
>>
>
>
> --
> Alper Tekinalp
>
> Software Developer
> Evam Streaming Analytics
>
> Atatürk Mah. Turgut Özal Bulv.
> Gardenya 5 Plaza K:6 Ataşehir
> 34758 İSTANBUL
>
> Tel:  +90 216 455 01 53 Fax: +90 216 455 01 54
> www.evam.com.tr
> <http://www.evam.com>
>

Reply via email to