Archive | October 2015

Using infinispan as embedded and clustered in-memory store for your Java SE application

infinispan is a distributed in-memory key/value data store with the option to query the inserted data using an internal DSL. In the last article we have seen how to query in-memory data structures using Apache Calcite, so in this article we are exploring how to do the same with infinispan.

The maven dependencies we need for our small example project are listed in the following:



The embedded distribution of infinispan lets us integrate the in-memory data store into our standard Java SE application. Please note that you also need the embedded version of the query module, as there is also a standard query module named infinispan-query available. Using the standard module will lead to class loading issues at runtime.

The cache is defined within the configuration file called infinispan.xml. Its name is passed into the constructor of the DefaultCacheManager:

private Cache<Object, Object> createCache() throws IOException {
	System.setProperty("nodeName", nodeName);
	EmbeddedCacheManager cacheManager = new DefaultCacheManager("infinispan.xml");
	Cache<Object, Object> cache = cacheManager.getCache("repl");"Started cache %s on node %s with members: %s", "dist", nodeName, cache.getAdvancedCache().getRpcManager().getMembers()));
	return cache;

In the example code above we choose to use a cached named repl. As the name indicates, this is a replicating cache where all values stored into one instance are replicated to all other available instance. If this synchronization should happen synchronously or asynchronously can be configured as shown in the following snippet from infinispan.xml:

<cache-container default-cache="default">
	<transport stack="udp" node-name="${nodeName}" />
	<replicated-cache name="repl" mode="SYNC" />

infinispan uses the well known jgroups library as communication protocol between the cluster nodes. Node name and the stack (here: UDP) are configured using the XML element transport.

Starting the first node (here called A) produces the following output:

Received new cluster view for channel ISPN: [A-28854|0] (1) [A-28854]
Started cache dist on node A with members: [A-28854]

We can see that member A has joined the cluster. Now we start node B:

Received new cluster view for channel ISPN: [A-28854|1] (2) [A-28854, B-24196]
Started cache dist on node B with members: [A-28854, B-24196]

Clearly the cluster has now two members: A and B. We also see that a rebalancing process starts, once the second node comes up:

Starting cluster-wide rebalance for cache repl, topology CacheTopology{id=1, rebalanceId=1, currentCH=ReplicatedConsistentHash{ns = 60, owners = (1)[A-28854: 60]}, pendingCH=ReplicatedConsistentHash{ns = 60, owners = (2)[A-28854: 30, B-24196: 30]}, unionCH=null, actualMembers=[A-28854, B-24196]}
Finished cluster-wide rebalance for cache repl, topology id = 1

Now that both nodes are up and running, we can add some data to the cache:

for (int i = 0; i < 500000; i++) {
	Person person = new Person(dataFactory.getNextFirstName(), dataFactory.getNextLastName());
	person.addAddress(new Address(dataFactory.getNextCity()));
	cache.put(nodeName + "-" + i, person);

The two classes Person and Address are simple POJOs with the fields firstName and lastName for Person and city for Address. When both nodes are putting 500,000 persons each into the cache, we have a total of 1M entries.

Now that the cache is filled with a respectable amount of data, it would be interesting to see how it performs. This can be done by obtaining a QueryFactory and by using its fluent API:

QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.from(Person.class).
List<Object> list = query.list();

All we have to do is to specify the base class of our query (here Person) and add the predicates using having() and eq(). Different predicates can be joined by using a conjunction like and(). We can even use nested attributes like in the example above with

When I run this query on a replicated cache with 1M entries, I get an average execution time for the query of about 543ms. But as all data resides in memory, this is not really surprisingly.

How does the query perform on a distributed cache? Within a distributed cache all entries are spread over the cluster nodes using a hash value to determine on which node to store an entry. A distributed cache can be configured with the XML element of the same name:

<distributed-cache name="dist" owners="1" segments="2"/>

In this simple example we are dividing the hash key space into two segments, meaning that each key is stored either in one of the two segments. Each hash segment is mapped to a list of nodes called owners. Here we are using only one owner, as we are working with two nodes. Hence each node manages one segment of the key space without any replicas.

Now that the data is evenly distributed over the available nodes, the query takes less time to execute: 338ms in average. The gain in performance results of course in the fact that now both nodes participate in the query and contribute their part of the result set.

Conclusion: With a few lines of code it is possible to setup an embedded in-memory key/value data store in your Java SE application, that can be even clustered over different nodes. The internal DSL makes it easy to query even nested data structures in respectable time.