Using JINQ with JPA and H2

A few days ago I have read the interesting interview with Ming-Yee Iu about JINQ. JINQ is, like the name already suggests, the attempt to provide something similar to LINQ for Java. The basic idea is to close the semantic gap between object-oriented code that executes queries on a relational data model. The queries for the relational database model should be easily integrated into the code such that it feels more natural.

The research behind LINQ came to the conclusion that the algorithms transforming the code into relational database queries work best with functional code. As Java 8 comes with the streams API, the author uses it to implement the ideas of his PhD in Java.

To get our hands dirty, we start with a simple project that uses Hibernate over JPA together with an H2 database and JINQ:

<dependencies>
	<dependency>
		<groupId>javax</groupId>
		<artifactId>javaee-api</artifactId>
		<version>${jee.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>com.h2database</groupId>
		<artifactId>h2</artifactId>
		<version>${h2.version}</version>
	</dependency>
	<dependency>
		<groupId>org.hibernate</groupId>
		<artifactId>hibernate-entitymanager</artifactId>
		<version>${hibernate.version}</version>
	</dependency>
	<dependency>
		<groupId>org.jinq</groupId>
		<artifactId>jinq-jpa</artifactId>
		<version>1.8.10</version>
	</dependency>
</dependencies>

In order to use JINQ streams we have to create a provider that gets the EntityManagerFactory as argument:

EntityManagerFactory factory = Persistence.createEntityManagerFactory("PersistenceUnit");
JinqJPAStreamProvider streams = new JinqJPAStreamProvider(factory);

Having inserted some persons into our database, we can easily query them:

List<String> firstNames = streams.streamAll(entityManager, Person.class)
		.map(Person::getFirstName)
		.collect(toList());
firstNames.forEach(System.out::println);

Using the method streamAll() of the previously created JinqJPAStreamProvider gives us access to all persons within the database. In this simple example we only want to output the first name of each person; hence we map the list and collect all results into a List. This list gets printed using the forEach() method and a reference to the println() method.

Taking a look at the generated SQL code, we see that all columns are selected:

select
	person0_.id as id1_4_,
	person0_.FIRST_NAME as FIRST_NA2_4_,
	person0_.ID_CARD_ID as ID_CARD_4_4_,
	person0_.LAST_NAME as LAST_NAM3_4_,
from
	T_PERSON person0_ 

Of course we can refine the statement using the select() method:

List<String> firstNames = streams.streamAll(entityManager, Person.class)
		.select(Person::getFirstName)
		.where(p -> p.equals("Homer"))
		.collect(toList());
firstNames.forEach(System.out::println);

Additionally we have also added a predicate (where firstName = 'Homer'):

    select
        person0_.FIRST_NAME as FIRST_NA2_4_
    from
        T_PERSON person0_ 
    where
        person0_.FIRST_NAME='Homer'

Leaving this simple example, we now want to create a query that selects all geeks with first name “Christian” that work in a time and material project:

List<String> geeks = streams.streamAll(entityManager, Project.class)
		.where(p -> p.getProjectType() == Project.ProjectType.TIME_AND_MATERIAL)
		.joinList(Project::getGeeks)
		.where(g -> g.getTwo().getFirstName().equals("Christian"))
		.map(p -> p.getTwo().getFirstName())
		.collect(toList());
geeks.forEach(System.out::println);

As can be seen from the code above, we use the first where() clause to select all time and material projects. The joinList() invocation joins the geek table while the subsequent where() clause also restricts to only select geeks with first name “Christian”. Et voila, that is the created SQL query:

select
	geek2_.FIRST_NAME as col_0_0_ 
from
	T_PROJECT project0_ 
inner join
	T_GEEK_PROJECT geeks1_ 
		on project0_.id=geeks1_.PROJECT_ID 
inner join
	T_GEEK geek2_ 
		on geeks1_.GEEK_ID=geek2_.id 
where
	project0_.projectType='TIME_AND_MATERIAL' 
	and geek2_.FIRST_NAME='Christian' limit ?

Conclusion: Having worked with JPA’s criteria API some time ago, I must say that the first steps with JINQ are more intuitive and where easier to write down. JINQ really helps to close the gap between the relational database world by using streams in Java 8.

Verify HTML documents in junit tests with jsoup

Assume that you are developing an application that creates some kind of fancy HTML report for its users. When it comes down to writing your unit tests, you have two choices:

  • You test the generated report against a complete report prepared beforehand.
  • You parse the HTML document and test parts of it separately.

The first choice seems to be simple at first glance, because you have manually validated that the prepared report is correct. Writing such kind of tests is also easy as it boils down to the following pattern:

String preparedReport = loadReportFromSomeWhere();
assertThat(generatedReport, is(preparedReport));

But what happens when you change a small part of the report generating code? You will have to change probably some or even all of the prepared reports. Hence the second choice is in these cases the better one, as you only have to adjust the test cases that are affected (and that you would have to change anyhow).

Here is the part where jsoup comes in handy. jsoup is a Java library developed for parsing HTML documents, but in contrast to other options for parsing XML like structures it supports CSS selectors like those used in JavaScript libraries like jquery. This way you don’t have to write tons of code in order to verify exactly the part of the report that your current unit test is concerned with.

To demonstrate how jsoup can be used, we assume that our application has a simple HtmlReport class that can be used to create a valid HTML document using the builder pattern (https://en.wikipedia.org/wiki/Builder_pattern):

String html = HtmlReport.create()
	.addHeader1("title", "Testing HTML documents with jsoup")
	.addSection("intro", "This section explains what the text is all about.")
	.addHeader2("jsoup", "jsoup in a nutshell")
	.addSection("pjsopu", "This section explains jsoup in detail.")
	.addList("jsoup_adv", Arrays.asList("find data using CSS selectors", "manipulate HTML elements"))
	.build();

To keep it simple, the report just consists of a header element (h1) followed by a section (p) and a paragraph with a header h2 that contains an HTML list (ul). The first argument to each method is the id of the HTML element. This way we can use it later on to address exactly the element we want and beyond that support the formatting of all elements (the CSS designer will love us).

The first thing we want to test is that the document contains an h2 element with id “title”:

<h1 id="title">Testing HTML documents with jsoup</h1>

Using jsoup this verification becomes a two liner:

Document doc = Jsoup.parse(html);
assertThat(doc.select("h1#title").text(), is("Testing HTML documents with jsoup"));

While we let jsoup parse the document in the first line, we can use the provided method select() to query for the element using the selector h1#title, i.e. we are asking for an h1 element with id title. The same way we can assure that we have a paragraph with the correct content:

assertThat(doc.select("p#intro").text(), is("This section explains what the text is all about."));

A little bit more tricky is to verify that the list with id jsoup_adv is written in the correct order. For that we have to use the pseudo selector :eq(n) that allows use to query for a specific index position of a sibling:

assertThat(doc.select("ul#jsoup_adv > li:eq(0)").text(), is("find data using CSS selectors"));
assertThat(doc.select("ul#jsoup_adv > li:eq(1)").text(), is("manipulate HTML elements"));

The selector ul#jsoup_adv > li:eq(0) asks for the first (:eq(0)) li elements that is a direct child of an ul element with id jsoup_adv.

Beyond that one can even use regular expression to find for example all h2 elements whose text ends with the string “nutshell”:

assertThat(doc.select("h2:matches(.*nutshell$)").size(), is(1));

Conclusion: Using jsoup for parsing HTML documents in junit tests makes the verification of HTML documents much easier and robust. If one is used to and likes CSS selectors like they are used by jquery, then jsoup is worth a look.

Using infinispan as embedded and clustered in-memory store for your Java SE application

infinispan is a distributed in-memory key/value data store with the option to query the inserted data using an internal DSL. In the last article we have seen how to query in-memory data structures using Apache Calcite, so in this article we are exploring how to do the same with infinispan.

The maven dependencies we need for our small example project are listed in the following:

<properties>
	<infinispan.version>7.2.5.Final</infinispan.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.infinispan</groupId>
		<artifactId>infinispan-embedded</artifactId>
		<version>${infinispan.version}</version>
	</dependency>
	<dependency>
		<groupId>org.infinispan</groupId>
		<artifactId>infinispan-embedded-query</artifactId>
		<version>${infinispan.version}</version>
	</dependency>
</dependencies>

The embedded distribution of infinispan lets us integrate the in-memory data store into our standard Java SE application. Please note that you also need the embedded version of the query module, as there is also a standard query module named infinispan-query available. Using the standard module will lead to class loading issues at runtime.

The cache is defined within the configuration file called infinispan.xml. Its name is passed into the constructor of the DefaultCacheManager:

private Cache<Object, Object> createCache() throws IOException {
	System.setProperty("nodeName", nodeName);
	EmbeddedCacheManager cacheManager = new DefaultCacheManager("infinispan.xml");
	Cache<Object, Object> cache = cacheManager.getCache("repl");
	LOGGER.info(String.format("Started cache %s on node %s with members: %s", "dist", nodeName, cache.getAdvancedCache().getRpcManager().getMembers()));
	return cache;
}

In the example code above we choose to use a cached named repl. As the name indicates, this is a replicating cache where all values stored into one instance are replicated to all other available instance. If this synchronization should happen synchronously or asynchronously can be configured as shown in the following snippet from infinispan.xml:

<cache-container default-cache="default">
	<transport stack="udp" node-name="${nodeName}" />
	<replicated-cache name="repl" mode="SYNC" />
</cache-container>

infinispan uses the well known jgroups library as communication protocol between the cluster nodes. Node name and the stack (here: UDP) are configured using the XML element transport.

Starting the first node (here called A) produces the following output:

Received new cluster view for channel ISPN: [A-28854|0] (1) [A-28854]
Started cache dist on node A with members: [A-28854]

We can see that member A has joined the cluster. Now we start node B:

Received new cluster view for channel ISPN: [A-28854|1] (2) [A-28854, B-24196]
Started cache dist on node B with members: [A-28854, B-24196]

Clearly the cluster has now two members: A and B. We also see that a rebalancing process starts, once the second node comes up:

Starting cluster-wide rebalance for cache repl, topology CacheTopology{id=1, rebalanceId=1, currentCH=ReplicatedConsistentHash{ns = 60, owners = (1)[A-28854: 60]}, pendingCH=ReplicatedConsistentHash{ns = 60, owners = (2)[A-28854: 30, B-24196: 30]}, unionCH=null, actualMembers=[A-28854, B-24196]}
Finished cluster-wide rebalance for cache repl, topology id = 1

Now that both nodes are up and running, we can add some data to the cache:

for (int i = 0; i < 500000; i++) {
	Person person = new Person(dataFactory.getNextFirstName(), dataFactory.getNextLastName());
	person.addAddress(new Address(dataFactory.getNextCity()));
	cache.put(nodeName + "-" + i, person);
}

The two classes Person and Address are simple POJOs with the fields firstName and lastName for Person and city for Address. When both nodes are putting 500,000 persons each into the cache, we have a total of 1M entries.

Now that the cache is filled with a respectable amount of data, it would be interesting to see how it performs. This can be done by obtaining a QueryFactory and by using its fluent API:

QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.from(Person.class).
		having("firstName").eq(dataFactory.getNextFirstName()).
		and().
		having("lastName").eq(dataFactory.getNextLastName()).
		and().
		having("addresses.city").eq(dataFactory.getNextCity()).
		toBuilder().build();
List<Object> list = query.list();

All we have to do is to specify the base class of our query (here Person) and add the predicates using having() and eq(). Different predicates can be joined by using a conjunction like and(). We can even use nested attributes like in the example above with addresses.city.

When I run this query on a replicated cache with 1M entries, I get an average execution time for the query of about 543ms. But as all data resides in memory, this is not really surprisingly.

How does the query perform on a distributed cache? Within a distributed cache all entries are spread over the cluster nodes using a hash value to determine on which node to store an entry. A distributed cache can be configured with the XML element of the same name:

<distributed-cache name="dist" owners="1" segments="2"/>

In this simple example we are dividing the hash key space into two segments, meaning that each key is stored either in one of the two segments. Each hash segment is mapped to a list of nodes called owners. Here we are using only one owner, as we are working with two nodes. Hence each node manages one segment of the key space without any replicas.

Now that the data is evenly distributed over the available nodes, the query takes less time to execute: 338ms in average. The gain in performance results of course in the fact that now both nodes participate in the query and contribute their part of the result set.

Conclusion: With a few lines of code it is possible to setup an embedded in-memory key/value data store in your Java SE application, that can be even clustered over different nodes. The internal DSL makes it easy to query even nested data structures in respectable time.

Apache Calcite: Setting up your own in-memory database with SQL interface

Some time ago we have seen how to use Apache Drill to query data that resides in CSV and parquet files. Apache Drill’s SQL interface is provided by another Apache project called “Calcite“. This project provides a SQL parser, JDBC driver and query optimizer that can be connected to different data stores via custom adapters.

In this article we are investigating how to use the ReflectiveSchema factory to create an in-memory database with SQL interface.

The schemas that the SQL parser should operate on can be specified either programmatically or with the means of a JSON file. Such JSON file can look like the following one:

{
  version: '1.0',
  defaultSchema: 'Persons',
  schemas: [
    {
      name: 'Persons',
      type: "custom",
      factory: "org.apache.calcite.adapter.java.ReflectiveSchema$Factory",
      operand: {
        class: "com.wordpress.martinsdeveloperworld.calcite.Schema",
        staticMethod: "getInstance"
      }
    }
  ]
}

The only schema we have specified with the file above is called “Persons” and is at the same time our default schema. The factory defined with the correspondent field name has to implement a method that returns an instance of the Calcite class Schema. Here we choose the ReflectiveSchema that ships with Calcite and that exposes the public fields of a Java object as tables. The class that generates this Java object is given through the operand’s field class and has to provide a factory method that returns this object (here: getInstance).

The Schema class mentioned above looks in our example like this:

public class Schema {
	private static final Logger LOGGER = Logger.getLogger(Schema.class.getName());
	public Person[] persons;
	public Address[] addresses;

	public static Schema getInstance() {
		LOGGER.log(Level.INFO, "Creating schema...");
		DataFactory dataFactory = new DataFactory(0);
		int numberOfPersons = 10000000;
		Schema schema = new Schema();
		schema.persons = new Person[numberOfPersons];
		schema.addresses = new Address[numberOfPersons];
		for (int i = 0; i < numberOfPersons; i++) {
			Person person = dataFactory.getNextPerson(i);
			schema.persons[i] = person;
			schema.addresses[i] = dataFactory.getNextAddress(person);
		}
		LOGGER.log(Level.INFO, "Created schema.");
		return schema;
	}
}

The two public fields persons and addresses will become the tables of our SQL schema. We initialize these two arrays with ten million persons and addresses, one person having exactly one address. The artificially generated id of the person is used as foreign key in the addresses table:

public class Person {
	public long id;
	public String firstName;
	public String lastName;
}
public class Address {
	public long personId;
	public String city;
}

The DataFactory creates a new person and randomly assigns a first and last name for each person as well as a city for each address. These values are taken from a collection of the most popular 200 first and last names in the US and the 100 biggest cities.

Now that we have created the schema and populated the tables with ten million rows, we can start to query them. The code to load the JDBC driver and to establish a connection to the data source looks like this one:

Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
try (Connection connection = 
	DriverManager.getConnection("jdbc:calcite:model=target/classes/model.json", info)) {
	...
}

The JSON file, that is referred to as model within the JDBC URL, is the one shown at the beginning of this article. First we want to know how many people have the last name ‘Smith’:

String query = "select count(*) from persons p where p.lastName = 'Smith'";
try (Statement statement = connection.createStatement();
	ResultSet resultSet = statement.executeQuery(query)) {
	int count = 0;
	while (resultSet.next()) {
		count = resultSet.getInt(1);
	}
	LOGGER.log(Level.INFO, "Result has " + count + " rows.");
} catch (Exception e) {
	LOGGER.log(Level.SEVERE, "Query failed: " + e.getMessage(), e);
}

When we modify the code above such that the query gets executed in a loop with randomly chosen last names from the collection, we can measure the average execution time of it. On my machine this yields about 105,3 ms over 100 iterations. Not bad!

But we also want to know how many people of these live in Chicago:

String query = "select count(*) from persons p " +
	" inner join addresses a on a.personId = p.id " +
	" where a.city = 'Chicago' and p.lastName = 'Smith'";
...

Executed with different, randomly chosen last names and cities, this query executes in average in about 341,9 ms. For a join query on two tables with ten million rows each this is also not that bad.

PS: The code is available on my github account.

Evaluating performance measurements with Apache’s commons-math3

Do you remember the term “Student’s t-test” from your statistics lessons? And do you use its intention in case you are doing performance measurements in your day-to-day life?

William Sealy Gosset was a chemist working at the Guinness brewery in Dublin where he has been recruited because he was one of the best graduates at Oxford. The brewery’s idea was to use the scientific knowledge in order to optimize the industrial processes. During his work at Guinness William Sealy Gosset developed a way to test hypothesis like “The means of these two populations are equal.”. But because publishing scientific results gathered during work was not allowed at Guinness, he published his work under the pseudonym “Student”. That’s why we all know this kind of hypothesis testing as “Student’s t-test”.

When we measure the performance of two different algorithms on the same hardware, we cannot just compare the resulting mean values in order conclude if one of them is faster. According the “Student’s t-test” we have to formulate a “null hypothesis” that could sound in this example like “There is no effective difference between the sample means of the two observations”. The next step is to compute the so called “t value”. For this computation we assume that both series of samples are independent, i.e. the observations in the first series are in no way related to the observations in the second series, and that the distribution of values follows a normal distribution. As we do not know if both series have the same variance, we must use the so called “heteroscedastic t-test” with the following formula:

t = (x - y) / sqrt( Sx^2 / n1 + Sy^2 / n2 )

x: mean of the first series
y: mean of the second series
Sx: standard deviation of the first series
Sy: standard deviation of the second series
n1: number of samples in the first series
n2: number of samples in the second series

Let’s assume we have measured the following data:

X Y
154.3 230.4
191.0 202.8
163.4 202.8
168.6 216.8
187.0 192.9
200.4 194.4
162.5 211.7

To compute the t value we can utilize Apache’s “commons-math3” library:

<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-math3</artifactId>
	<version>3.5</version>
</dependency>

As the commons-math3 library already ships with a TTest class, we can easily implement the t-test:

double[] observations1 = readObservations(args[0]);
double[] observations2 = readObservations(args[1]);
Mean mean = new Mean();
double mean1 = mean.evaluate(observations1);
double mean2 = mean.evaluate(observations2);
TTest tTest = new TTest();
double significanceLevel1 = tTest.tTest(observations1, observations2);
double significanceLevel2 = tTest.homoscedasticTTest(observations1, observations2);
System.out.println(String.format("Mean1: %.10f", mean1));
System.out.println(String.format("Mean2: %.10f", mean2));
System.out.println(String.format("heteroscedastic t-Test: %.10f", 
	(1 - significanceLevel1) * 100));
System.out.println(String.format("homoscedastic t-Test:   %.10f", 
	(1 - significanceLevel2) * 100));

The example code above also computes the so called “homoscedastic” t-test, which assumes that the two samples are drawn from subpopulations with equal variances. The two methods from the commons library compute the smallest “significance level” at which one can reject the null hypothesis that the two means are equal. The “confidence level”, which is easier to understand, can be computed by subtracting the “significance level” from 1. As the result is a probability, we can multiply it with 100 in order to get a statement in percentage:

Mean1: 175,3142857143
Mean2: 207,4000000000
heteroscedastic t-Test: 99,7562734547
homoscedastic t-Test:   99,7838832707

This means that we can reject the statement that the mean value of both sample series is equal with a probability of 99.8%. Or the other way round that the probability that both series have the same mean value is only 0.2%. Hence the two measurements are very likely to be different. But the result is not always as clear as in this case. Let’s take a look at these two series:

X Y
154.3 155.3
191.0 163.7
163.4 200.1
168.6 177.5
187.0 188.3
200.4 198.7
162.5 201.7

The output here is:

Mean1: 175,3142857143
Mean2: 183,6142857143
heteroscedastic t-Test: 59,4632442225
homoscedastic t-Test:   59,4717945546

At first glance the second series of sample values performs much slower. But the probability that we can reject the null hypothesis that both means are equal is only 59.5%. In other words: The probability that both series have the same mean value is only about 40.5%.

Converting a CSV file to parquet and querying it with Apache Drill

The Apache Drill project provides SQL-like access to different kinds of data stores. The supported data stores span relational as well as NoSQL databases and the file system. Hence you can query data from HBase, MongoDB, HDFS and the local file system without the need to convert the data before. Even joins between the different formats are possible. Internally Apache Drill prepares the potentially nested data (from for example JSON files) in a columnar representation as described in Google’s Dremel paper. This columnar data structures allow queries that only select a subset of the available columns to perform much faster, as only these selected columns have to be read from the data structure. In contrast to traditional relational databases the whole row with data does not have to be loaded from disc.

The data structure described in Google’s Dremel paper is also available as file format called parquet and allows you to store and retrieve data from a columnar storage. If you plan to execute multiple queries on a big data set, it can be reasonable to convert the CSV file to the parquet format and query it using Apache Drill. In this article we therefore explore how to convert a CSV file into a parquet file using Apache’s parquet library:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-column</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-hadoop</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>2.4.0</version>
</dependency>

In order to tell Apache Parquet the structure of the CSV file, we have to create an instance of MessageType and pass in a message definition written in Google’s Protocol Buffer (https://developers.google.com/protocol-buffers/). As our CSV file contains first name, last name, date of birth and place of birth for a large number of persons, the Protocol Buffer message looks like this:

message csv {
    required binary firstName = 1; 
    required binary lastName = 2; 
    required binary dateOfBirth = 3; 
    required binary placeOfBirth = 4;
}

All fields are required and of type binary. This message definition is then stored as the first line of our CSV file, such that it can be read directly from it.

The code in the main() method of our sample application looks like this:

String firstLine = Files.readFirstLine(options.getCsvPath().toFile(), 
	Charset.defaultCharset());
MessageType messageType = MessageTypeParser.parseMessageType(firstLine);
WriteSupport<List<String>> writeSupport = new CsvWriteSupport(messageType);
String line;
try (CsvParquetWriter writer = new CsvParquetWriter(path, writeSupport);
	BufferedReader br = new BufferedReader(new FileReader(options.getCsvPath().toFile()))) {
	while ((line = br.readLine()) != null) {
		String[] fields = line.split(Pattern.quote(SEPARATOR));
		writer.write(Arrays.asList(fields));
	}
}

The first two lines create an instance of MessageType using the first line of the CSV file. This instance of MessageType is then passed into the constructor of our CsvWriteSupport class:

public class CsvWriteSupport extends WriteSupport<List<String>> {
	private final MessageType messageType;
	private RecordConsumer recordConsumer;

	public CsvWriteSupport(MessageType messageType) {
		this.messageType = messageType;
	}

	@Override
	public WriteSupport.WriteContext init(Configuration configuration) {
		return new WriteSupport.WriteContext(messageType, new HashMap<String, String>());
	}

	@Override
	public void prepareForWrite(RecordConsumer recordConsumer) {
		this.recordConsumer = recordConsumer;
	}

	@Override
	public void write(List<String> record) {
		recordConsumer.startMessage();
		List<ColumnDescriptor> columns = messageType.getColumns();
		for (int i = 0; i < columns.size(); i++) {
			String value = record.get(i);
			if (value.length() > 0) {
				recordConsumer.startField(columns.get(i).getPath()[0], i);
				switch (columns.get(i).getType()) {
					...
					case BINARY:
						recordConsumer.addBinary(Binary.fromByteArray(value.getBytes(Charset.defaultCharset())));
						break;
					default:
						throw new ParquetEncodingException("Unsupported column type: " + columns.get(i).getType());
				}
				recordConsumer.endField(columns.get(i).getPath()[0], i);
			}
		}
		recordConsumer.endMessage();
	}
}

While the implementation of the two methods init() and prepareForWrite() is simple, the core logic resides in write(). Here the CsvWriteSupport tells parquet to start a new message and then to add different fields. The switch statement has been shortened to focus on the type binary. Here the string read from the CSV file gets converted into a byte array with respect to the default charset of the platform. Note that write() gets called within the while loop that iterates over the lines in the CSV file.

After having compiled the application into a jar file that contains all dependencies, we can start it with the following command and pass the existing CSV file and the name of the output file on the command line:

java -Djava.library.path=/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native -classpath "$HADOOP_JAR_DIRS:csv-to-parquet-0.0.1-SNAPSHOT-jar-with-dependencies.jar" com.wordpress.mdw.Main <csv> <parquet>

The first observation is, that the parquet file (here test.par) is much smaller than the corresponding CSV file:

417M test.cvs
35M  test.parquet

The compression ratio of about 92% stems from the fact that parquet supports very efficient compression and encoding schemes. As the data is stored in a columnar fashion, compression algorithms can use the fact that one column contains similar data. Even gzip compression of the CSV file only reaches 83% compaction ratio.

As a final step we want to query the number of records/lines in both files with Apache Drill and compare the execution times:

0: jdbc:drill:> select count(*) from dfs.`/tmp/test_drill_tab.csv`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (5.771 seconds)
0: jdbc:drill:> select count(*) from dfs.`/tmp/test.parquet`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (0.257 seconds)

From the explanations above it is clear that the first query has to read the complete file whereas the second query can concentrate on one column. Beyond that the parquet implementation does also store the number of values in each page header (a column is divided into multiple chunks/pages). The same is true when we ask Drill to count the entries where first name is ‘DAVID’:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID';
+---------+
| EXPR$0  |
+---------+
| 999190  |
+---------+
1 row selected (2.272 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (6.418 seconds)

The answer for the parquet file comes after about 2 seconds, the query running on the CSV file takes about 4 seconds longer. It even gets worse when querying two columns:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID' and lastName = 'MILLER';
+---------+
| EXPR$0  |
+---------+
| 110813  |
+---------+
1 row selected (5.838 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID' and columns[1] = 'MILLER';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (29.57 seconds)

Now the parquet query only takes about 19.7% of the CSV query’s time. Finally please note that we do not have any kind of indexes like in a traditional RDBMS. The idea of the Dremel paper is to perform always a “full” scan of the complete column. But querying 10 million records that are stored in a compressed format within 5 seconds is still not bad.

Conclusion: Storing data in the parquet file format does not only save disc space (compression ratio of about 92%) but also reduces query times by the factor three to five.

PS: The source code is available at github.

Updating code at runtime (spring-loaded demystified)

When the development cycle from compile over deployment up to testing takes too long, one wishes to be able to replace the running code just in time without the need for restarting an application server and waiting until deployment has been finished. Commercial solutions like JRebel or open source frameworks like Grails help in such kind of situations.

Replacing code at runtime is not supported out of the box by the JVM in a kind like you can dynamically load classes with for example Class.forName(). Basically you have the following options:

  • HotSwap: A technolog introduced with Java 1.4 that allows you to redefine classes within a debugger session. This approach is very limited as it only allows you to change the body of a method but not the addition of new methods or classes.
  • OSGi: This technology allows you to define bundles. At runtime a bundle can be replaced by a newer version of this bundle.
  • Throwaway classloaders: By wrapping a separate Classloader over all classes of your module, you can throw away the Classloader and replace it, once a new version of your module is availalbe.
  • Instrumenting classes with a Java Agent: A Java Agent can instrument classes before they are defined. This way it can inject code into loaded classes that connects this class with one version of the class file. Once a new version is available, the new code gets executed.

The technology behing Grails is called spring-loaded and uses the “Java Agent” approach to instrument classes that are loaded from the file system and not from a jar file. But how does this work under the hood?

To understand spring-loaded, we setup a small sample project that allows us to examine the technology in more detail. This project only consists of two classes: the Main class calls the print() method of the ToBeChanged class and sleeps for a while:

public static void main(String[] args) throws InterruptedException {
  while (true) {
    ToBeChanged toBeChanged = new ToBeChanged();
    toBeChanged.print();
    Thread.sleep(500);
  }
}

The print() method just prints out a version, such that we can see that it has changed. Additionally we also print out the stack trace in order to see how this changes over time:

public void print() {
  System.out.println("V1");
  StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
  for (StackTraceElement element : stackTrace) {
    System.out.println("\t" + element.getClassName() + "." 
      + element.getMethodName() + ":" + element.getLineNumber());
  }
}

When starting the application we have to provide the jar file that contains the Java Agent using the option javaagent. As spring-loaded modifies the bytecode in a way that the verifier does not like, we have to disable verification of the bytecode by passing the option noverify to the JVM. Finally we pass the folder that contains our class files with cp and tell the JVM the class that contains the main() method:

java -javaagent:springloaded-1.2.4.BUILD-SNAPSHOT.jar 
  -noverify 
  -cp target/classes 
  com.martinsdeveloperworld.springloaded.Main

After having updated the version in class ToBeChanged from V1 to V2 and rebuilding the project with mvn package, we see the following output:

...
V1
        java.lang.Thread.getStackTrace:-1
        com.martinsdeveloperworld.springloaded.ToBeChanged.print:7
        com.martinsdeveloperworld.springloaded.Main.main:8
V2
        java.lang.Thread.getStackTrace:-1
        com.martinsdeveloperworld.springloaded.ToBeChanged$$EPBF0gVl.print:7
        com.martinsdeveloperworld.springloaded.ToBeChanged$$DPBF0gVl.print:-1
        com.martinsdeveloperworld.springloaded.ToBeChanged.print:-1
        com.martinsdeveloperworld.springloaded.Main.main:8
...

The stacktrace of version V1 looks like we have expected. From Main.main() the method ToBeChanged.print() gets called. This differs for version V2. Here the method ToBeChanged.print now calls the method ToBeChanged$$DPBF0gVl.print(). Please also note that the line number for the call ToBeChanged.print() has changed from 8 to -1, indicating that the line is not known.

The new line number -1 is a strong indication that the Java Agent has instrumented the method ToBeChanged.print() in a way that allows it to call the new method instead of executing the old code. To prove this assumption, I have added a few logging statements to the code of spring-loaded and a feature that dumps each instrumtend file to the local hard drive. This way we can inspect how the method ToBeChanged.print() looks like after instrumentation:

  0 getstatic #16 <com/martinsdeveloperworld/springloaded/ToBeChanged.r$type>
  3 ldc #72 <0>
  5 invokevirtual #85 <org/springsource/loaded/ReloadableType.changed>
  8 dup
  9 ifeq 42 (+33)
 12 iconst_1
 13 if_icmpeq 26 (+13)
 16 new #87 <java/lang/NoSuchMethodError>
 19 dup
 20 ldc #89 <com.martinsdeveloperworld.springloaded.ToBeChanged.print()V>
 22 invokespecial #92 <java/lang/NoSuchMethodError.<init>>
 25 athrow
 26 getstatic #16 <com/martinsdeveloperworld/springloaded/ToBeChanged.r$type>
 29 invokevirtual #56 <org/springsource/loaded/ReloadableType.fetchLatest>
 32 checkcast #58 <com/martinsdeveloperworld/springloaded/ToBeChanged__I>
 35 aload_0
 36 invokeinterface #94 <com/martinsdeveloperworld/springloaded/ToBeChanged__I.print> count 2
 41 return
 42 pop
 43 getstatic #100 <java/lang/System.out>
 46 ldc #102 <V1>
 48 invokevirtual #107 <java/io/PrintStream.println>
 51 invokestatic #113 <java/lang/Thread.currentThread>
 54 invokevirtual #117 <java/lang/Thread.getStackTrace>
 57 astore_1
...
152 return

The getstatic opcode retrieves the value for the new field r$type and pushes it on the stack (opcode ldc). Then the method ReloadableType.changed() gets called for the object reference that was pushed on the stack before. As the name indicates, the method ReloadableType.changed() checks whether a new version of this type exists. It returns 0 if the method did not change and 1 if it has changed. The following opcode ifeq jumps to line 42 if the returned value was zero, i.e. the method has not changed. From line 42 on we see the original implementation which I have shortened here a little bit.

If the value is 1, the if_icmpeq instruction jumps to line 26, where the static field r$type is read once again. This reference is used to invoke the method ReloadableType.fetchLatest() on it. The following checkcast instruction verifies that the returned reference is of type ToBeChanged__I. Here we stumble for the first time over this artifical interface that spring-loaded generates for each type. It reflects the methods the original class had when it was instrumented. Two lines later this interface is used to invoke the method print() on the reference that was returned by ReloadableType.fetchLatest().

This reference is not the reference to the new version of the class but to a so called dispatcher. The dispatcher implements the interface ToBeChanged__I and implements the method print() with the following instructions:

0 aload_1
1 invokestatic #21 <com/martinsdeveloperworld/springloaded/ToBeChanged$$EPBF0gVl.print>
4 return

The dynamically generated class ToBeChanged$$EPBF0gVl is the so called executor and embodies the new version of the type. For each new version a new dispatcher and executor is created, only the interface remains the same. Once a new version is available, the interface method is invoked on the new dispatcher and this one forwards in the simplest case to the new version of the code embodied in the executor. The reason why the interface method is not called directly on the exeuctor is the fact that spring-loaded can also handle cases in which methods are added in a new version of the class. As this methods do not exist in the old version, a generic method __execute() is added to the interface and the dispatcher. This dynamic method can then dispatch calls to new methods as shown in the following instruction set taken from the generated dispatcher:

 0 aload_3
 1 ldc #25 <newMethod()V>
 3 invokevirtual #31 <java/lang/String.equals>
 6 ifeq 18 (+12)
 9 aload_2
10 checkcast #33 <com/martinsdeveloperworld/springloaded/ToBeChanged>
13 invokestatic #36 <com/martinsdeveloperworld/springloaded/ToBeChanged$$EPBFaboY.newMethod>
16 aconst_null
17 areturn
18 aload_3
...
68 areturn

In this case I have added a new method called newMethod() to the class ToBeChanged. The beginning of the __execute() method compares whether the descriptor invoked matches the new method. If this is the case, it forwards the invocation to the new executor. In order to let this work, all invocations of the new method have to be rewritten to the __execute() method. This is also done via instrumentation of the original classes and does also work for reflection.

Conclusion: spring-loaded demonstrates that it is possible to “replace” a class with a newer version at runtime. To achieve this, a series of Java technologies like the Java Agent and bytecode instrumentation are utilized. By taking a closer look at the implementation, one can learn a lot of things about the JVM and Java in general.

Follow

Get every new post delivered to your Inbox.