Here is the code Snippet which shows how we put data to the cache.
public void method(argType arg){
while (true){
try(Transaction
tx=ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC,TransactionIsolation.SERIALIZABLE)){
workflowRunStateIgniteCache.put(arg);
tx.commit();
}catch(TransactionOptimisticException e){
System.out.println("Transaction failed. Retrying...");
}
}
}
On Tue, Dec 12, 2017 at 8:19 PM, Nikolai Tikhonov <[email protected]>
wrote:
> Can you share code snippet which shows how you put data to the cache?
>
> On Tue, Dec 12, 2017 at 12:26 PM, Harshil garg <[email protected]>
> wrote:
>
>> Sorry , forgot to attach the xml used for configuring cache.
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>>
>> <!--
>> Licensed to the Apache Software Foundation (ASF) under one or more
>> contributor license agreements. See the NOTICE file distributed with
>> this work for additional information regarding copyright ownership.
>> The ASF licenses this file to You under the Apache License, Version 2.0
>> (the "License"); you may not use this file except in compliance with
>> the License. You may obtain a copy of the License at
>> http://www.apache.org/licenses/LICENSE-2.0
>> Unless required by applicable law or agreed to in writing, software
>> distributed under the License is distributed on an "AS IS" BASIS,
>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> See the License for the specific language governing permissions and
>> limitations under the License.
>> -->
>>
>> <!--
>> Ignite configuration with all defaults and enabled p2p deployment and
>> enabled events.
>> -->
>> <beans xmlns="http://www.springframework.org/schema/beans"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> xmlns:context="http://www.springframework.org/schema/context"
>> xsi:schemaLocation="
>> http://www.springframework.org/schema/beans
>> http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
>> http://www.springframework.org/schema/context
>> http://www.springframework.org/schema/context/spring-context-3.2.xsd">
>>
>> <context:property-placeholder location="classpath:app.properties"/>
>> <bean abstract="true" id="ignite-data.cfg"
>> class="org.apache.ignite.configuration.IgniteConfiguration">
>> <!-- Set to true to enable distributed class loading for examples,
>> default is false. -->
>> <property name="peerClassLoadingEnabled" value="true"/>
>>
>> <!-- Ignite predicate filter to separate data nodes from service
>> nodes -->
>> <property name="userAttributes">
>> <map key-type="java.lang.String" value-type="java.lang.Boolean">
>> <entry key="data.node" value="true"/>
>> </map>
>> </property>
>> <property name="cacheConfiguration">
>> <list>
>> <bean
>> class="org.apache.ignite.configuration.CacheConfiguration">
>> <!-- Set a cache name. -->
>> <property name="name" value="${cache.workflow.name}"/>
>>
>> <!-- Set cache mode. -->
>> <property name="cacheMode" value="PARTITIONED"/>
>>
>> <property name="backups" value="2"/>
>>
>> <property name="statisticsEnabled" value="true"/>
>>
>> <property name="nodeFilter">
>> <bean
>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>> </property>
>> </bean>
>> <bean
>> class="org.apache.ignite.configuration.CacheConfiguration">
>> <!-- Set a cache name. -->
>> <property name="name"
>> value="${cache.workflow-run.name}"/>
>>
>> <!-- Set cache mode. -->
>> <property name="cacheMode" value="REPLICATED"/>
>>
>> <!-- This is a transactional cache as many keys need to
>> be updated together -->
>> <property name="atomicityMode" value="TRANSACTIONAL"/>
>>
>> <property name="backups" value="1"/>
>>
>> <property name="statisticsEnabled" value="true"/>
>>
>> <property name="nodeFilter">
>> <bean
>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>> </property>
>> </bean>
>> <bean
>> class="org.apache.ignite.configuration.CacheConfiguration">
>> <!-- Set a cache name. -->
>> <property name="name"
>> value="${cache.workflow-pause.name}"/>
>>
>> <!-- Set cache mode. -->
>> <property name="cacheMode" value="PARTITIONED"/>
>>
>> <property name="backups" value="1"/>
>>
>> <property name="statisticsEnabled" value="true"/>
>>
>> <property name="nodeFilter">
>> <bean
>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>> </property>
>> </bean>
>> </list>
>> </property>
>> <!-- Explicitly configure TCP discovery SPI to provide list of
>> initial nodes. -->
>> <property name="discoverySpi">
>> <bean
>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>> <property name="ipFinder">
>> <!--
>> Ignite provides several options for automatic
>> discovery that can be used
>> instead os static IP based discovery. For
>> information on all options refer
>> to our documentation:
>> http://apacheignite.readme.io/docs/cluster-config
>> -->
>> <!-- Uncomment static IP finder to enable static-based
>> discovery of initial nodes. -->
>> <!--<bean
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>> <bean
>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>> <property name="addresses">
>> <list>
>> <!-- In distributed environment, replace
>> with actual host IP address. -->
>> <value>127.0.0.1:47500..47509</value>
>> </list>
>> </property>
>> </bean>
>>
>> </property>
>> </bean>
>> </property>
>> </bean>
>> <bean parent="ignite-data.cfg"/>
>> </beans>
>>
>>
>> On Tue, Dec 12, 2017 at 2:55 PM, Harshil garg <[email protected]>
>> wrote:
>>
>>> Hi Nikolai
>>>
>>> I haven't removed any data from other nodes. How data can be removed
>>> manually from cache manually ??
>>> I have three cache deployed in Data Node which is configured using this
>>> below xml.
>>>
>>> I have data getting saved in workflowRunStateCache which I verified in
>>> web console. But when I am trying to access this cache from different
>>> server node , cache is coming completely empty .
>>>
>>> What could be the possible reasons ??
>>> Need some solid inputs regarding the same.
>>>
>>> Any help will be highly appreciated.
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>>
>>> <!--
>>> Licensed to the Apache Software Foundation (ASF) under one or more
>>> contributor license agreements. See the NOTICE file distributed with
>>> this work for additional information regarding copyright ownership.
>>> The ASF licenses this file to You under the Apache License, Version 2.0
>>> (the "License"); you may not use this file except in compliance with
>>> the License. You may obtain a copy of the License at
>>> http://www.apache.org/licenses/LICENSE-2.0
>>> Unless required by applicable law or agreed to in writing, software
>>> distributed under the License is distributed on an "AS IS" BASIS,
>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> See the License for the specific language governing permissions and
>>> limitations under the License.
>>> -->
>>>
>>> <!--
>>> Ignite configuration with all defaults and enabled p2p deployment and
>>> enabled events.
>>> -->
>>> <beans xmlns="http://www.springframework.org/schema/beans"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>> xmlns:context="http://www.springframework.org/schema/context"
>>> xsi:schemaLocation="
>>> http://www.springframework.org/schema/beans
>>> http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
>>> http://www.springframework.org/schema/context
>>> http://www.springframework.org/schema/context/spring-context-3.2.xsd">
>>>
>>> <context:property-placeholder location="classpath:app.properties"/>
>>> <bean abstract="true" id="ignite-data.cfg"
>>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>> <!-- Set to true to enable distributed class loading for examples,
>>> default is false. -->
>>> <property name="peerClassLoadingEnabled" value="true"/>
>>>
>>> <!-- Ignite predicate filter to separate data nodes from service
>>> nodes -->
>>> <property name="userAttributes">
>>> <map key-type="java.lang.String" value-type="java.lang.Boolean">
>>> <entry key="data.node" value="true"/>
>>> </map>
>>> </property>
>>> <property name="cacheConfiguration">
>>> <list>
>>> <bean
>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>> <!-- Set a cache name. -->
>>> <property name="name" value="${cache.workflow.name}"/>
>>>
>>> <!-- Set cache mode. -->
>>> <property name="cacheMode" value="PARTITIONED"/>
>>>
>>> <property name="backups" value="2"/>
>>>
>>> <property name="statisticsEnabled" value="true"/>
>>>
>>> <property name="nodeFilter">
>>> <bean
>>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>>> </property>
>>> </bean>
>>> <bean
>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>> <!-- Set a cache name. -->
>>> <property name="name"
>>> value="${cache.workflow-run.name}"/>
>>>
>>> <!-- Set cache mode. -->
>>> <property name="cacheMode" value="REPLICATED"/>
>>>
>>> <!-- This is a transactional cache as many keys need to
>>> be updated together -->
>>> <property name="atomicityMode" value="TRANSACTIONAL"/>
>>>
>>> <property name="backups" value="1"/>
>>>
>>> <property name="statisticsEnabled" value="true"/>
>>>
>>> <property name="nodeFilter">
>>> <bean
>>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>>> </property>
>>> </bean>
>>> <bean
>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>> <!-- Set a cache name. -->
>>> <property name="name"
>>> value="${cache.workflow-pause.name}"/>
>>>
>>> <!-- Set cache mode. -->
>>> <property name="cacheMode" value="PARTITIONED"/>
>>>
>>> <property name="backups" value="1"/>
>>>
>>> <property name="statisticsEnabled" value="true"/>
>>>
>>> <property name="nodeFilter">
>>> <bean
>>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>>> </property>
>>> </bean>
>>> </list>
>>> </property>
>>> <!-- Explicitly configure TCP discovery SPI to provide list of
>>> initial nodes. -->
>>> <property name="discoverySpi">
>>> <bean
>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>> <property name="ipFinder">
>>> <!--
>>> Ignite provides several options for automatic
>>> discovery that can be used
>>> instead os static IP based discovery. For
>>> information on all options refer
>>> to our documentation:
>>> http://apacheignite.readme.io/docs/cluster-config
>>> -->
>>> <!-- Uncomment static IP finder to enable static-based
>>> discovery of initial nodes. -->
>>> <!--<bean
>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>>> <bean
>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>>> <property name="addresses">
>>> <list>
>>> <!-- In distributed environment, replace
>>> with actual host IP address. -->
>>> <value>127.0.0.1:47500..47509</value>
>>> </list>
>>> </property>
>>> </bean>
>>>
>>> </property>
>>> </bean>
>>> </property>
>>> </bean>
>>> <bean parent="ignite-data.cfg"/>
>>> </beans>
>>>
>>>
>>>
>>>
>>> On Mon, Dec 11, 2017 at 8:29 PM, Nikolai Tikhonov <[email protected]>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> It looks weird for me. You should see the same data set from all nodes
>>>> of cluster. I think you or remove data from other nodes or performe an
>>>> operations under another cache. Can you share simple maven project which
>>>> reproduces the problem?
>>>>
>>>> On Mon, Dec 11, 2017 at 5:22 PM, Harshil garg <[email protected]>
>>>> wrote:
>>>>
>>>>> I am trying to access ignite cache data from other nodes , I am able
>>>>> to access the ignite cache but cache is completely empty and hence
>>>>> throwing
>>>>> nullpointerException when I am trying to do cache.get(key).
>>>>>
>>>>> I have tried using both REPLICATED and PARTITONED MODE for
>>>>> workflowRunState Cache.
>>>>>
>>>>> Here is the xml configuration
>>>>>
>>>>> <?xml version="1.0" encoding="UTF-8"?>
>>>>>
>>>>> <!--
>>>>> Licensed to the Apache Software Foundation (ASF) under one or more
>>>>> contributor license agreements. See the NOTICE file distributed with
>>>>> this work for additional information regarding copyright ownership.
>>>>> The ASF licenses this file to You under the Apache License, Version 2.0
>>>>> (the "License"); you may not use this file except in compliance with
>>>>> the License. You may obtain a copy of the License at
>>>>> http://www.apache.org/licenses/LICENSE-2.0
>>>>> Unless required by applicable law or agreed to in writing, software
>>>>> distributed under the License is distributed on an "AS IS" BASIS,
>>>>> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> See the License for the specific language governing permissions and
>>>>> limitations under the License.
>>>>> -->
>>>>>
>>>>> <!--
>>>>> Ignite configuration with all defaults and enabled p2p deployment and
>>>>> enabled events.
>>>>> -->
>>>>> <beans xmlns="http://www.springframework.org/schema/beans"
>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>> xmlns:context="http://www.springframework.org/schema/context"
>>>>> xsi:schemaLocation="
>>>>> http://www.springframework.org/schema/beans
>>>>> http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
>>>>> http://www.springframework.org/schema/context
>>>>> http://www.springframework.org/schema/context/spring-context-3.2.xsd">
>>>>>
>>>>> <context:property-placeholder location="classpath:app.properties"/>
>>>>> <bean abstract="true" id="ignite-data.cfg"
>>>>> class="org.apache.ignite.configuration.IgniteConfiguration">
>>>>> <!-- Set to true to enable distributed class loading for
>>>>> examples, default is false. -->
>>>>> <property name="peerClassLoadingEnabled" value="true"/>
>>>>>
>>>>> <!-- Ignite predicate filter to separate data nodes from service
>>>>> nodes -->
>>>>> <property name="userAttributes">
>>>>> <map key-type="java.lang.String"
>>>>> value-type="java.lang.Boolean">
>>>>> <entry key="data.node" value="true"/>
>>>>> </map>
>>>>> </property>
>>>>> <property name="cacheConfiguration">
>>>>> <list>
>>>>> <bean
>>>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>>>> <!-- Set a cache name. -->
>>>>> <property name="name" value="${cache.workflow.name}"/>
>>>>>
>>>>> <!-- Set cache mode. -->
>>>>> <property name="cacheMode" value="PARTITIONED"/>
>>>>>
>>>>> <property name="backups" value="2"/>
>>>>>
>>>>> <property name="statisticsEnabled" value="true"/>
>>>>>
>>>>> <property name="nodeFilter">
>>>>> <bean
>>>>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>>>>> </property>
>>>>> </bean>
>>>>> <bean
>>>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>>>> <!-- Set a cache name. -->
>>>>> <property name="name"
>>>>> value="${cache.workflow-run.name}"/>
>>>>>
>>>>> <!-- Set cache mode. -->
>>>>> <property name="cacheMode" value="REPLICATED"/>
>>>>>
>>>>> <!-- This is a transactional cache as many keys need
>>>>> to be updated together -->
>>>>> <property name="atomicityMode" value="TRANSACTIONAL"/>
>>>>>
>>>>> <property name="backups" value="1"/>
>>>>>
>>>>> <property name="statisticsEnabled" value="true"/>
>>>>>
>>>>> <property name="nodeFilter">
>>>>> <bean
>>>>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>>>>> </property>
>>>>> </bean>
>>>>> <bean
>>>>> class="org.apache.ignite.configuration.CacheConfiguration">
>>>>> <!-- Set a cache name. -->
>>>>> <property name="name"
>>>>> value="${cache.workflow-pause.name}"/>
>>>>>
>>>>> <!-- Set cache mode. -->
>>>>> <property name="cacheMode" value="PARTITIONED"/>
>>>>>
>>>>> <property name="backups" value="1"/>
>>>>>
>>>>> <property name="statisticsEnabled" value="true"/>
>>>>>
>>>>> <property name="nodeFilter">
>>>>> <bean
>>>>> class="com.mediaiq.caps.platform.choreography.commons.filter.DataNodeFilter"/>
>>>>> </property>
>>>>> </bean>
>>>>> </list>
>>>>> </property>
>>>>> <!-- Explicitly configure TCP discovery SPI to provide list of
>>>>> initial nodes. -->
>>>>> <property name="discoverySpi">
>>>>> <bean
>>>>> class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
>>>>> <property name="ipFinder">
>>>>> <!--
>>>>> Ignite provides several options for automatic
>>>>> discovery that can be used
>>>>> instead os static IP based discovery. For
>>>>> information on all options refer
>>>>> to our documentation:
>>>>> http://apacheignite.readme.io/docs/cluster-config
>>>>> -->
>>>>> <!-- Uncomment static IP finder to enable
>>>>> static-based discovery of initial nodes. -->
>>>>> <!--<bean
>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
>>>>> <bean
>>>>> class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
>>>>> <property name="addresses">
>>>>> <list>
>>>>> <!-- In distributed environment, replace
>>>>> with actual host IP address. -->
>>>>> <value>127.0.0.1:47500..47509</value>
>>>>> </list>
>>>>> </property>
>>>>> </bean>
>>>>>
>>>>> </property>
>>>>> </bean>
>>>>> </property>
>>>>> </bean>
>>>>> <bean parent="ignite-data.cfg"/>
>>>>> </beans>
>>>>>
>>>>> All these caches are deployed in Data Node.
>>>>>
>>>>> Now after doing some operations I had populated data in
>>>>> workflowRunstateCache which I verified in web console as well.
>>>>>
>>>>> But when I try to access the same cache from different server node ,
>>>>> no data is available in that. In the following code I am trying to access
>>>>> workflowrunStateCache from different server node , it shows me that
>>>>> containsKey as false and thows NullpointerException in debug mode when I
>>>>> do
>>>>> workflowRunStateCache.get();
>>>>>
>>>>> while (true) {
>>>>> try (Transaction tx =
>>>>> ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC,
>>>>> TransactionIsolation.SERIALIZABLE)) {
>>>>> System.out.println("Conatins Key" +
>>>>> workflowRunStateIgniteCache.containsKey(updatedKeys.get(0)));
>>>>> System.out.println("Conatins Key" + workflowRunStateIgniteCache);
>>>>> Boolean flowProcessable = updatedKeys.stream()
>>>>> // check if there is at least one event in each cache
>>>>> entry to be processed
>>>>> .map(updatedKey ->
>>>>> workflowRunStateIgniteCache.get(updatedKey).getFlowRunEvents().size() > 0)
>>>>> .reduce(true, (a, b) -> a && b).booleanValue();
>>>>>
>>>>> List<Event> inputEvents = null;
>>>>>
>>>>> if (flowProcessable) {
>>>>> inputEvents = updatedKeys
>>>>> .stream()
>>>>> .map(updatedKey -> {
>>>>> try {
>>>>> return
>>>>> workflowRunStateIgniteCache.get(updatedKey).getFlowRunEvents().take();
>>>>> } catch (InterruptedException e) {
>>>>> e.printStackTrace();
>>>>> }
>>>>> return null;
>>>>> }).collect(Collectors.toList());
>>>>> }
>>>>>
>>>>> tx.commit();
>>>>>
>>>>> break;
>>>>> } catch (TransactionOptimisticException e) {
>>>>> // todo: emit a monitoring metric TRANSACTIONS_FAILED here
>>>>> System.out.println("Transaction failed. Retrying...");
>>>>> }
>>>>> }
>>>>>
>>>>> System.out logs:
>>>>>
>>>>> Conatins Keyfalse
>>>>>
>>>>>
>>>>
>>>
>>
>