Tag Archive | in-memory

Using infinispan as embedded and clustered in-memory store for your Java SE application

infinispan is a distributed in-memory key/value data store with the option to query the inserted data using an internal DSL. In the last article we have seen how to query in-memory data structures using Apache Calcite, so in this article we are exploring how to do the same with infinispan.

The maven dependencies we need for our small example project are listed in the following:

<properties>
	<infinispan.version>7.2.5.Final</infinispan.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.infinispan</groupId>
		<artifactId>infinispan-embedded</artifactId>
		<version>${infinispan.version}</version>
	</dependency>
	<dependency>
		<groupId>org.infinispan</groupId>
		<artifactId>infinispan-embedded-query</artifactId>
		<version>${infinispan.version}</version>
	</dependency>
</dependencies>

The embedded distribution of infinispan lets us integrate the in-memory data store into our standard Java SE application. Please note that you also need the embedded version of the query module, as there is also a standard query module named infinispan-query available. Using the standard module will lead to class loading issues at runtime.

The cache is defined within the configuration file called infinispan.xml. Its name is passed into the constructor of the DefaultCacheManager:

private Cache<Object, Object> createCache() throws IOException {
	System.setProperty("nodeName", nodeName);
	EmbeddedCacheManager cacheManager = new DefaultCacheManager("infinispan.xml");
	Cache<Object, Object> cache = cacheManager.getCache("repl");
	LOGGER.info(String.format("Started cache %s on node %s with members: %s", "dist", nodeName, cache.getAdvancedCache().getRpcManager().getMembers()));
	return cache;
}

In the example code above we choose to use a cached named repl. As the name indicates, this is a replicating cache where all values stored into one instance are replicated to all other available instance. If this synchronization should happen synchronously or asynchronously can be configured as shown in the following snippet from infinispan.xml:

<cache-container default-cache="default">
	<transport stack="udp" node-name="${nodeName}" />
	<replicated-cache name="repl" mode="SYNC" />
</cache-container>

infinispan uses the well known jgroups library as communication protocol between the cluster nodes. Node name and the stack (here: UDP) are configured using the XML element transport.

Starting the first node (here called A) produces the following output:

Received new cluster view for channel ISPN: [A-28854|0] (1) [A-28854]
Started cache dist on node A with members: [A-28854]

We can see that member A has joined the cluster. Now we start node B:

Received new cluster view for channel ISPN: [A-28854|1] (2) [A-28854, B-24196]
Started cache dist on node B with members: [A-28854, B-24196]

Clearly the cluster has now two members: A and B. We also see that a rebalancing process starts, once the second node comes up:

Starting cluster-wide rebalance for cache repl, topology CacheTopology{id=1, rebalanceId=1, currentCH=ReplicatedConsistentHash{ns = 60, owners = (1)[A-28854: 60]}, pendingCH=ReplicatedConsistentHash{ns = 60, owners = (2)[A-28854: 30, B-24196: 30]}, unionCH=null, actualMembers=[A-28854, B-24196]}
Finished cluster-wide rebalance for cache repl, topology id = 1

Now that both nodes are up and running, we can add some data to the cache:

for (int i = 0; i < 500000; i++) {
	Person person = new Person(dataFactory.getNextFirstName(), dataFactory.getNextLastName());
	person.addAddress(new Address(dataFactory.getNextCity()));
	cache.put(nodeName + "-" + i, person);
}

The two classes Person and Address are simple POJOs with the fields firstName and lastName for Person and city for Address. When both nodes are putting 500,000 persons each into the cache, we have a total of 1M entries.

Now that the cache is filled with a respectable amount of data, it would be interesting to see how it performs. This can be done by obtaining a QueryFactory and by using its fluent API:

QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.from(Person.class).
		having("firstName").eq(dataFactory.getNextFirstName()).
		and().
		having("lastName").eq(dataFactory.getNextLastName()).
		and().
		having("addresses.city").eq(dataFactory.getNextCity()).
		toBuilder().build();
List<Object> list = query.list();

All we have to do is to specify the base class of our query (here Person) and add the predicates using having() and eq(). Different predicates can be joined by using a conjunction like and(). We can even use nested attributes like in the example above with addresses.city.

When I run this query on a replicated cache with 1M entries, I get an average execution time for the query of about 543ms. But as all data resides in memory, this is not really surprisingly.

How does the query perform on a distributed cache? Within a distributed cache all entries are spread over the cluster nodes using a hash value to determine on which node to store an entry. A distributed cache can be configured with the XML element of the same name:

<distributed-cache name="dist" owners="1" segments="2"/>

In this simple example we are dividing the hash key space into two segments, meaning that each key is stored either in one of the two segments. Each hash segment is mapped to a list of nodes called owners. Here we are using only one owner, as we are working with two nodes. Hence each node manages one segment of the key space without any replicas.

Now that the data is evenly distributed over the available nodes, the query takes less time to execute: 338ms in average. The gain in performance results of course in the fact that now both nodes participate in the query and contribute their part of the result set.

Conclusion: With a few lines of code it is possible to setup an embedded in-memory key/value data store in your Java SE application, that can be even clustered over different nodes. The internal DSL makes it easy to query even nested data structures in respectable time.

Advertisements

Apache Calcite: Setting up your own in-memory database with SQL interface

Some time ago we have seen how to use Apache Drill to query data that resides in CSV and parquet files. Apache Drill’s SQL interface is provided by another Apache project called “Calcite“. This project provides a SQL parser, JDBC driver and query optimizer that can be connected to different data stores via custom adapters.

In this article we are investigating how to use the ReflectiveSchema factory to create an in-memory database with SQL interface.

The schemas that the SQL parser should operate on can be specified either programmatically or with the means of a JSON file. Such JSON file can look like the following one:

{
  version: '1.0',
  defaultSchema: 'Persons',
  schemas: [
    {
      name: 'Persons',
      type: "custom",
      factory: "org.apache.calcite.adapter.java.ReflectiveSchema$Factory",
      operand: {
        class: "com.wordpress.martinsdeveloperworld.calcite.Schema",
        staticMethod: "getInstance"
      }
    }
  ]
}

The only schema we have specified with the file above is called “Persons” and is at the same time our default schema. The factory defined with the correspondent field name has to implement a method that returns an instance of the Calcite class Schema. Here we choose the ReflectiveSchema that ships with Calcite and that exposes the public fields of a Java object as tables. The class that generates this Java object is given through the operand’s field class and has to provide a factory method that returns this object (here: getInstance).

The Schema class mentioned above looks in our example like this:

public class Schema {
	private static final Logger LOGGER = Logger.getLogger(Schema.class.getName());
	public Person[] persons;
	public Address[] addresses;

	public static Schema getInstance() {
		LOGGER.log(Level.INFO, "Creating schema...");
		DataFactory dataFactory = new DataFactory(0);
		int numberOfPersons = 10000000;
		Schema schema = new Schema();
		schema.persons = new Person[numberOfPersons];
		schema.addresses = new Address[numberOfPersons];
		for (int i = 0; i < numberOfPersons; i++) {
			Person person = dataFactory.getNextPerson(i);
			schema.persons[i] = person;
			schema.addresses[i] = dataFactory.getNextAddress(person);
		}
		LOGGER.log(Level.INFO, "Created schema.");
		return schema;
	}
}

The two public fields persons and addresses will become the tables of our SQL schema. We initialize these two arrays with ten million persons and addresses, one person having exactly one address. The artificially generated id of the person is used as foreign key in the addresses table:

public class Person {
	public long id;
	public String firstName;
	public String lastName;
}
public class Address {
	public long personId;
	public String city;
}

The DataFactory creates a new person and randomly assigns a first and last name for each person as well as a city for each address. These values are taken from a collection of the most popular 200 first and last names in the US and the 100 biggest cities.

Now that we have created the schema and populated the tables with ten million rows, we can start to query them. The code to load the JDBC driver and to establish a connection to the data source looks like this one:

Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
try (Connection connection = 
	DriverManager.getConnection("jdbc:calcite:model=target/classes/model.json", info)) {
	...
}

The JSON file, that is referred to as model within the JDBC URL, is the one shown at the beginning of this article. First we want to know how many people have the last name ‘Smith’:

String query = "select count(*) from persons p where p.lastName = 'Smith'";
try (Statement statement = connection.createStatement();
	ResultSet resultSet = statement.executeQuery(query)) {
	int count = 0;
	while (resultSet.next()) {
		count = resultSet.getInt(1);
	}
	LOGGER.log(Level.INFO, "Result has " + count + " rows.");
} catch (Exception e) {
	LOGGER.log(Level.SEVERE, "Query failed: " + e.getMessage(), e);
}

When we modify the code above such that the query gets executed in a loop with randomly chosen last names from the collection, we can measure the average execution time of it. On my machine this yields about 105,3 ms over 100 iterations. Not bad!

But we also want to know how many people of these live in Chicago:

String query = "select count(*) from persons p " +
	" inner join addresses a on a.personId = p.id " +
	" where a.city = 'Chicago' and p.lastName = 'Smith'";
...

Executed with different, randomly chosen last names and cities, this query executes in average in about 341,9 ms. For a join query on two tables with ten million rows each this is also not that bad.

PS: The code is available on my github account.