Using Apache Zest to enforce application composition

Layers and modules are well known concepts of application composition. But how to enforce them? Besides the access modifiers for classes and methods there are currently no further means within the language itself. Java 9 will introduce with the “Java Module System” further means to control the usage of packages from other “modules”. Then you can use of course the build system to manage dependencies and therewith configure which module can access which other module. Beyond that tools like “Structure 101” or “degraph” help you with that.

The problem becomes even more obvious when using dependency injection. In this case the injected object can stem from another module or layer and borders between them vanish. Apache Zest therefore provides means of tackling the enforcement of application composition by letting you define to which layers and modules classes belong to. The injection container then has fine grained control from which module or layer injected objects can stem from.

Let’s take a closer look by implementing a small example application. The following dependencies are added to the project:

<dependencies>
	<dependency>
		<groupId>org.qi4j.core</groupId>
		<artifactId>org.qi4j.core.bootstrap</artifactId>
		<version>${zest.version}</version>
	</dependency>
	<dependency>
		<groupId>org.qi4j.core</groupId>
		<artifactId>org.qi4j.core.runtime</artifactId>
		<version>${zest.version}</version>
		<scope>runtime</scope>
	</dependency>
</dependencies>

Now we can construct the application using an anonymous ApplicationAssembler:

Energy4Java e4j = new Energy4Java();
final Application application = e4j.newApplication(new ApplicationAssembler() {
	@Override
	public ApplicationAssembly assemble(ApplicationAssemblyFactory factory)
		throws AssemblyException {

		ApplicationAssembly assembler = factory.newApplicationAssembly();

		LayerAssembly domainLayer = assembler.layer("domain");
		ModuleAssembly personModule = domainLayer.module("person");
		personModule.entities(PersonEntity.class)
			.visibleIn(Visibility.layer);
		personModule.addServices(PersonFactoryService.class)
			.visibleIn(Visibility.layer);

		LayerAssembly persistenceLayer = assembler.layer("persistence");
		ModuleAssembly fileStoreModule = persistenceLayer.module("storage");
		fileStoreModule.addServices(StorageService.class)
			.visibleIn(Visibility.application);

		domainLayer.uses(persistenceLayer);

		return assembler;
	}
});
application.activate();

First of all we create the “domain” layer. This layer is supposed to contain the module “person”. A module can contain entities and services that are then added to it. For the entities and services we can specify their visibility. In our simple example the scope layer is enough.

Next to the “domain” layer we also introduce a “persistence” layer with a StorageService that is exposed to the whole application. This is necessary because we want to use it later on in the PersonFactoryService to store and load persons.

The fact that the “domain” layer is using the “persistence” layer can be expressed by invoking the uses() method on the “domain” layer instance.

Finally we activate the application, i.e. the lifecycle of the injection container starts.

The Person entity consists of two interfaces:

public interface Person {
	@Immutable
	Property<String> firstName();
	@Immutable
	Property<String> lastName();
}

public interface PersonEntity extends Person, EntityComposite {

}

The interface Person has two methods that return a Property instance. In contrast to ordinary value object implementations the indirection through Property allows the framework to check if the value is not changed as the annotation Immutable suggests. The actual PersonEntity interface extends both the Person and the EntityComposite interface. The latter marks the interface as an “entity”.

The concrete implementation of the interface PersonEntity is done as a proxy object by the container. This way the container can inject the code to verfiy access to the properties and control the lifecycle of the object. The following method from the PersonFactoryMixin shows how to construct such kind of entity:

public class PersonFactoryMixin implements PersonFactory,
	EntityStore, IdentityGenerator {

	@Structure
	Module module;
	@Service
	StorageService storageService;

	@Override
	public Person create(String firstName, String lastName)
		throws IOException {

		UnitOfWork unitOfWork = module.currentUnitOfWork();
		EntityBuilder<Person> personEntityBuilder =
			unitOfWork.newEntityBuilder(Person.class);

		Person person = personEntityBuilder.instance();
		person.firstName().set(firstName);
		person.lastName().set(lastName);

		storageService.persist(person);

		return personEntityBuilder.newInstance();
	}

The annotation @Structure and @Service let the container inject the appropriate instances of the two interfaces Module and StorageService. The first one is used in the first line of the method to obtain a reference to the current “unit of work”. The term “unit of work” is used in Apache Zest to desribe a “transaction”. This “unit of work” can then be used to retrieve an EntityBuilder.

The EntityBuilder‘s method instance() creates a Person proxy that can be initialized. Once all fields are set, the method can return an immutable instance of the person using the method newInstance() of the EntityBuilder.

Meanwhile the person has also been persisted to the “storage” using the injected StorageService. This works because we have defined at the beginning that the “domain” layer can “use” the “persistence” layer. Would this dependency have been defined the other way round, we would obtain an exception at runtime:

org.qi4j.api.common.ConstructionException: [Module person] Non-optional @Service interface
com.wordpress.mdw.zest.services.StorageService was null in
com.wordpress.mdw.zest.services.PersonFactoryMixin.

When we use the create() method of the PersonService and try to change the first name of a person, like in the followig example code, we get an IllegalStateException: java.lang.IllegalStateException: Property [com.wordpress.mdw.zest.entities.Person:firstName] is immutable.:

Module personModule = application.findModule("domain", "person");
try (UnitOfWork unitOfWork = personModule.newUnitOfWork()) {
	PersonFactoryService personFactory = personModule
		.findService(PersonFactoryService.class).get();
	Person person = personFactory.create("Homer", "Simpson");
	//try to modify @Immutable property
	person.firstName().set("Marge");
	unitOfWork.complete();
}

Conclusion: It was fun to see all the popular approaches like Aspect Oriented Programming, Dependency Injection and Domain Driven Design combined into one framework that even lets you enforce boundaries between layers and modules. As the language itself unfortunately does not (yet) support such kind of features, it is up to frameworks like Apache Zest to retrofit them.

Using JINQ with JPA and H2

A few days ago I have read the interesting interview with Ming-Yee Iu about JINQ. JINQ is, like the name already suggests, the attempt to provide something similar to LINQ for Java. The basic idea is to close the semantic gap between object-oriented code that executes queries on a relational data model. The queries for the relational database model should be easily integrated into the code such that it feels more natural.

The research behind LINQ came to the conclusion that the algorithms transforming the code into relational database queries work best with functional code. As Java 8 comes with the streams API, the author uses it to implement the ideas of his PhD in Java.

To get our hands dirty, we start with a simple project that uses Hibernate over JPA together with an H2 database and JINQ:

<dependencies>
	<dependency>
		<groupId>javax</groupId>
		<artifactId>javaee-api</artifactId>
		<version>${jee.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>com.h2database</groupId>
		<artifactId>h2</artifactId>
		<version>${h2.version}</version>
	</dependency>
	<dependency>
		<groupId>org.hibernate</groupId>
		<artifactId>hibernate-entitymanager</artifactId>
		<version>${hibernate.version}</version>
	</dependency>
	<dependency>
		<groupId>org.jinq</groupId>
		<artifactId>jinq-jpa</artifactId>
		<version>1.8.10</version>
	</dependency>
</dependencies>

In order to use JINQ streams we have to create a provider that gets the EntityManagerFactory as argument:

EntityManagerFactory factory = Persistence.createEntityManagerFactory("PersistenceUnit");
JinqJPAStreamProvider streams = new JinqJPAStreamProvider(factory);

Having inserted some persons into our database, we can easily query them:

List<String> firstNames = streams.streamAll(entityManager, Person.class)
		.map(Person::getFirstName)
		.collect(toList());
firstNames.forEach(System.out::println);

Using the method streamAll() of the previously created JinqJPAStreamProvider gives us access to all persons within the database. In this simple example we only want to output the first name of each person; hence we map the list and collect all results into a List. This list gets printed using the forEach() method and a reference to the println() method.

Taking a look at the generated SQL code, we see that all columns are selected:

select
	person0_.id as id1_4_,
	person0_.FIRST_NAME as FIRST_NA2_4_,
	person0_.ID_CARD_ID as ID_CARD_4_4_,
	person0_.LAST_NAME as LAST_NAM3_4_,
from
	T_PERSON person0_ 

Of course we can refine the statement using the select() method:

List<String> firstNames = streams.streamAll(entityManager, Person.class)
		.select(Person::getFirstName)
		.where(p -> p.equals("Homer"))
		.collect(toList());
firstNames.forEach(System.out::println);

Additionally we have also added a predicate (where firstName = 'Homer'):

    select
        person0_.FIRST_NAME as FIRST_NA2_4_
    from
        T_PERSON person0_ 
    where
        person0_.FIRST_NAME='Homer'

Leaving this simple example, we now want to create a query that selects all geeks with first name “Christian” that work in a time and material project:

List<String> geeks = streams.streamAll(entityManager, Project.class)
		.where(p -> p.getProjectType() == Project.ProjectType.TIME_AND_MATERIAL)
		.joinList(Project::getGeeks)
		.where(g -> g.getTwo().getFirstName().equals("Christian"))
		.map(p -> p.getTwo().getFirstName())
		.collect(toList());
geeks.forEach(System.out::println);

As can be seen from the code above, we use the first where() clause to select all time and material projects. The joinList() invocation joins the geek table while the subsequent where() clause also restricts to only select geeks with first name “Christian”. Et voila, that is the created SQL query:

select
	geek2_.FIRST_NAME as col_0_0_ 
from
	T_PROJECT project0_ 
inner join
	T_GEEK_PROJECT geeks1_ 
		on project0_.id=geeks1_.PROJECT_ID 
inner join
	T_GEEK geek2_ 
		on geeks1_.GEEK_ID=geek2_.id 
where
	project0_.projectType='TIME_AND_MATERIAL' 
	and geek2_.FIRST_NAME='Christian' limit ?

Conclusion: Having worked with JPA’s criteria API some time ago, I must say that the first steps with JINQ are more intuitive and where easier to write down. JINQ really helps to close the gap between the relational database world by using streams in Java 8.

Verify HTML documents in junit tests with jsoup

Assume that you are developing an application that creates some kind of fancy HTML report for its users. When it comes down to writing your unit tests, you have two choices:

  • You test the generated report against a complete report prepared beforehand.
  • You parse the HTML document and test parts of it separately.

The first choice seems to be simple at first glance, because you have manually validated that the prepared report is correct. Writing such kind of tests is also easy as it boils down to the following pattern:

String preparedReport = loadReportFromSomeWhere();
assertThat(generatedReport, is(preparedReport));

But what happens when you change a small part of the report generating code? You will have to change probably some or even all of the prepared reports. Hence the second choice is in these cases the better one, as you only have to adjust the test cases that are affected (and that you would have to change anyhow).

Here is the part where jsoup comes in handy. jsoup is a Java library developed for parsing HTML documents, but in contrast to other options for parsing XML like structures it supports CSS selectors like those used in JavaScript libraries like jquery. This way you don’t have to write tons of code in order to verify exactly the part of the report that your current unit test is concerned with.

To demonstrate how jsoup can be used, we assume that our application has a simple HtmlReport class that can be used to create a valid HTML document using the builder pattern (https://en.wikipedia.org/wiki/Builder_pattern):

String html = HtmlReport.create()
	.addHeader1("title", "Testing HTML documents with jsoup")
	.addSection("intro", "This section explains what the text is all about.")
	.addHeader2("jsoup", "jsoup in a nutshell")
	.addSection("pjsopu", "This section explains jsoup in detail.")
	.addList("jsoup_adv", Arrays.asList("find data using CSS selectors", "manipulate HTML elements"))
	.build();

To keep it simple, the report just consists of a header element (h1) followed by a section (p) and a paragraph with a header h2 that contains an HTML list (ul). The first argument to each method is the id of the HTML element. This way we can use it later on to address exactly the element we want and beyond that support the formatting of all elements (the CSS designer will love us).

The first thing we want to test is that the document contains an h2 element with id “title”:

<h1 id="title">Testing HTML documents with jsoup</h1>

Using jsoup this verification becomes a two liner:

Document doc = Jsoup.parse(html);
assertThat(doc.select("h1#title").text(), is("Testing HTML documents with jsoup"));

While we let jsoup parse the document in the first line, we can use the provided method select() to query for the element using the selector h1#title, i.e. we are asking for an h1 element with id title. The same way we can assure that we have a paragraph with the correct content:

assertThat(doc.select("p#intro").text(), is("This section explains what the text is all about."));

A little bit more tricky is to verify that the list with id jsoup_adv is written in the correct order. For that we have to use the pseudo selector :eq(n) that allows use to query for a specific index position of a sibling:

assertThat(doc.select("ul#jsoup_adv > li:eq(0)").text(), is("find data using CSS selectors"));
assertThat(doc.select("ul#jsoup_adv > li:eq(1)").text(), is("manipulate HTML elements"));

The selector ul#jsoup_adv > li:eq(0) asks for the first (:eq(0)) li elements that is a direct child of an ul element with id jsoup_adv.

Beyond that one can even use regular expression to find for example all h2 elements whose text ends with the string “nutshell”:

assertThat(doc.select("h2:matches(.*nutshell$)").size(), is(1));

Conclusion: Using jsoup for parsing HTML documents in junit tests makes the verification of HTML documents much easier and robust. If one is used to and likes CSS selectors like they are used by jquery, then jsoup is worth a look.

Using infinispan as embedded and clustered in-memory store for your Java SE application

infinispan is a distributed in-memory key/value data store with the option to query the inserted data using an internal DSL. In the last article we have seen how to query in-memory data structures using Apache Calcite, so in this article we are exploring how to do the same with infinispan.

The maven dependencies we need for our small example project are listed in the following:

<properties>
	<infinispan.version>7.2.5.Final</infinispan.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.infinispan</groupId>
		<artifactId>infinispan-embedded</artifactId>
		<version>${infinispan.version}</version>
	</dependency>
	<dependency>
		<groupId>org.infinispan</groupId>
		<artifactId>infinispan-embedded-query</artifactId>
		<version>${infinispan.version}</version>
	</dependency>
</dependencies>

The embedded distribution of infinispan lets us integrate the in-memory data store into our standard Java SE application. Please note that you also need the embedded version of the query module, as there is also a standard query module named infinispan-query available. Using the standard module will lead to class loading issues at runtime.

The cache is defined within the configuration file called infinispan.xml. Its name is passed into the constructor of the DefaultCacheManager:

private Cache<Object, Object> createCache() throws IOException {
	System.setProperty("nodeName", nodeName);
	EmbeddedCacheManager cacheManager = new DefaultCacheManager("infinispan.xml");
	Cache<Object, Object> cache = cacheManager.getCache("repl");
	LOGGER.info(String.format("Started cache %s on node %s with members: %s", "dist", nodeName, cache.getAdvancedCache().getRpcManager().getMembers()));
	return cache;
}

In the example code above we choose to use a cached named repl. As the name indicates, this is a replicating cache where all values stored into one instance are replicated to all other available instance. If this synchronization should happen synchronously or asynchronously can be configured as shown in the following snippet from infinispan.xml:

<cache-container default-cache="default">
	<transport stack="udp" node-name="${nodeName}" />
	<replicated-cache name="repl" mode="SYNC" />
</cache-container>

infinispan uses the well known jgroups library as communication protocol between the cluster nodes. Node name and the stack (here: UDP) are configured using the XML element transport.

Starting the first node (here called A) produces the following output:

Received new cluster view for channel ISPN: [A-28854|0] (1) [A-28854]
Started cache dist on node A with members: [A-28854]

We can see that member A has joined the cluster. Now we start node B:

Received new cluster view for channel ISPN: [A-28854|1] (2) [A-28854, B-24196]
Started cache dist on node B with members: [A-28854, B-24196]

Clearly the cluster has now two members: A and B. We also see that a rebalancing process starts, once the second node comes up:

Starting cluster-wide rebalance for cache repl, topology CacheTopology{id=1, rebalanceId=1, currentCH=ReplicatedConsistentHash{ns = 60, owners = (1)[A-28854: 60]}, pendingCH=ReplicatedConsistentHash{ns = 60, owners = (2)[A-28854: 30, B-24196: 30]}, unionCH=null, actualMembers=[A-28854, B-24196]}
Finished cluster-wide rebalance for cache repl, topology id = 1

Now that both nodes are up and running, we can add some data to the cache:

for (int i = 0; i < 500000; i++) {
	Person person = new Person(dataFactory.getNextFirstName(), dataFactory.getNextLastName());
	person.addAddress(new Address(dataFactory.getNextCity()));
	cache.put(nodeName + "-" + i, person);
}

The two classes Person and Address are simple POJOs with the fields firstName and lastName for Person and city for Address. When both nodes are putting 500,000 persons each into the cache, we have a total of 1M entries.

Now that the cache is filled with a respectable amount of data, it would be interesting to see how it performs. This can be done by obtaining a QueryFactory and by using its fluent API:

QueryFactory queryFactory = Search.getQueryFactory(cache);
Query query = queryFactory.from(Person.class).
		having("firstName").eq(dataFactory.getNextFirstName()).
		and().
		having("lastName").eq(dataFactory.getNextLastName()).
		and().
		having("addresses.city").eq(dataFactory.getNextCity()).
		toBuilder().build();
List<Object> list = query.list();

All we have to do is to specify the base class of our query (here Person) and add the predicates using having() and eq(). Different predicates can be joined by using a conjunction like and(). We can even use nested attributes like in the example above with addresses.city.

When I run this query on a replicated cache with 1M entries, I get an average execution time for the query of about 543ms. But as all data resides in memory, this is not really surprisingly.

How does the query perform on a distributed cache? Within a distributed cache all entries are spread over the cluster nodes using a hash value to determine on which node to store an entry. A distributed cache can be configured with the XML element of the same name:

<distributed-cache name="dist" owners="1" segments="2"/>

In this simple example we are dividing the hash key space into two segments, meaning that each key is stored either in one of the two segments. Each hash segment is mapped to a list of nodes called owners. Here we are using only one owner, as we are working with two nodes. Hence each node manages one segment of the key space without any replicas.

Now that the data is evenly distributed over the available nodes, the query takes less time to execute: 338ms in average. The gain in performance results of course in the fact that now both nodes participate in the query and contribute their part of the result set.

Conclusion: With a few lines of code it is possible to setup an embedded in-memory key/value data store in your Java SE application, that can be even clustered over different nodes. The internal DSL makes it easy to query even nested data structures in respectable time.

Apache Calcite: Setting up your own in-memory database with SQL interface

Some time ago we have seen how to use Apache Drill to query data that resides in CSV and parquet files. Apache Drill’s SQL interface is provided by another Apache project called “Calcite“. This project provides a SQL parser, JDBC driver and query optimizer that can be connected to different data stores via custom adapters.

In this article we are investigating how to use the ReflectiveSchema factory to create an in-memory database with SQL interface.

The schemas that the SQL parser should operate on can be specified either programmatically or with the means of a JSON file. Such JSON file can look like the following one:

{
  version: '1.0',
  defaultSchema: 'Persons',
  schemas: [
    {
      name: 'Persons',
      type: "custom",
      factory: "org.apache.calcite.adapter.java.ReflectiveSchema$Factory",
      operand: {
        class: "com.wordpress.martinsdeveloperworld.calcite.Schema",
        staticMethod: "getInstance"
      }
    }
  ]
}

The only schema we have specified with the file above is called “Persons” and is at the same time our default schema. The factory defined with the correspondent field name has to implement a method that returns an instance of the Calcite class Schema. Here we choose the ReflectiveSchema that ships with Calcite and that exposes the public fields of a Java object as tables. The class that generates this Java object is given through the operand’s field class and has to provide a factory method that returns this object (here: getInstance).

The Schema class mentioned above looks in our example like this:

public class Schema {
	private static final Logger LOGGER = Logger.getLogger(Schema.class.getName());
	public Person[] persons;
	public Address[] addresses;

	public static Schema getInstance() {
		LOGGER.log(Level.INFO, "Creating schema...");
		DataFactory dataFactory = new DataFactory(0);
		int numberOfPersons = 10000000;
		Schema schema = new Schema();
		schema.persons = new Person[numberOfPersons];
		schema.addresses = new Address[numberOfPersons];
		for (int i = 0; i < numberOfPersons; i++) {
			Person person = dataFactory.getNextPerson(i);
			schema.persons[i] = person;
			schema.addresses[i] = dataFactory.getNextAddress(person);
		}
		LOGGER.log(Level.INFO, "Created schema.");
		return schema;
	}
}

The two public fields persons and addresses will become the tables of our SQL schema. We initialize these two arrays with ten million persons and addresses, one person having exactly one address. The artificially generated id of the person is used as foreign key in the addresses table:

public class Person {
	public long id;
	public String firstName;
	public String lastName;
}
public class Address {
	public long personId;
	public String city;
}

The DataFactory creates a new person and randomly assigns a first and last name for each person as well as a city for each address. These values are taken from a collection of the most popular 200 first and last names in the US and the 100 biggest cities.

Now that we have created the schema and populated the tables with ten million rows, we can start to query them. The code to load the JDBC driver and to establish a connection to the data source looks like this one:

Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
try (Connection connection = 
	DriverManager.getConnection("jdbc:calcite:model=target/classes/model.json", info)) {
	...
}

The JSON file, that is referred to as model within the JDBC URL, is the one shown at the beginning of this article. First we want to know how many people have the last name ‘Smith’:

String query = "select count(*) from persons p where p.lastName = 'Smith'";
try (Statement statement = connection.createStatement();
	ResultSet resultSet = statement.executeQuery(query)) {
	int count = 0;
	while (resultSet.next()) {
		count = resultSet.getInt(1);
	}
	LOGGER.log(Level.INFO, "Result has " + count + " rows.");
} catch (Exception e) {
	LOGGER.log(Level.SEVERE, "Query failed: " + e.getMessage(), e);
}

When we modify the code above such that the query gets executed in a loop with randomly chosen last names from the collection, we can measure the average execution time of it. On my machine this yields about 105,3 ms over 100 iterations. Not bad!

But we also want to know how many people of these live in Chicago:

String query = "select count(*) from persons p " +
	" inner join addresses a on a.personId = p.id " +
	" where a.city = 'Chicago' and p.lastName = 'Smith'";
...

Executed with different, randomly chosen last names and cities, this query executes in average in about 341,9 ms. For a join query on two tables with ten million rows each this is also not that bad.

PS: The code is available on my github account.

Evaluating performance measurements with Apache’s commons-math3

Do you remember the term “Student’s t-test” from your statistics lessons? And do you use its intention in case you are doing performance measurements in your day-to-day life?

William Sealy Gosset was a chemist working at the Guinness brewery in Dublin where he has been recruited because he was one of the best graduates at Oxford. The brewery’s idea was to use the scientific knowledge in order to optimize the industrial processes. During his work at Guinness William Sealy Gosset developed a way to test hypothesis like “The means of these two populations are equal.”. But because publishing scientific results gathered during work was not allowed at Guinness, he published his work under the pseudonym “Student”. That’s why we all know this kind of hypothesis testing as “Student’s t-test”.

When we measure the performance of two different algorithms on the same hardware, we cannot just compare the resulting mean values in order conclude if one of them is faster. According the “Student’s t-test” we have to formulate a “null hypothesis” that could sound in this example like “There is no effective difference between the sample means of the two observations”. The next step is to compute the so called “t value”. For this computation we assume that both series of samples are independent, i.e. the observations in the first series are in no way related to the observations in the second series, and that the distribution of values follows a normal distribution. As we do not know if both series have the same variance, we must use the so called “heteroscedastic t-test” with the following formula:

t = (x - y) / sqrt( Sx^2 / n1 + Sy^2 / n2 )

x: mean of the first series
y: mean of the second series
Sx: standard deviation of the first series
Sy: standard deviation of the second series
n1: number of samples in the first series
n2: number of samples in the second series

Let’s assume we have measured the following data:

X Y
154.3 230.4
191.0 202.8
163.4 202.8
168.6 216.8
187.0 192.9
200.4 194.4
162.5 211.7

To compute the t value we can utilize Apache’s “commons-math3” library:

<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-math3</artifactId>
	<version>3.5</version>
</dependency>

As the commons-math3 library already ships with a TTest class, we can easily implement the t-test:

double[] observations1 = readObservations(args[0]);
double[] observations2 = readObservations(args[1]);
Mean mean = new Mean();
double mean1 = mean.evaluate(observations1);
double mean2 = mean.evaluate(observations2);
TTest tTest = new TTest();
double significanceLevel1 = tTest.tTest(observations1, observations2);
double significanceLevel2 = tTest.homoscedasticTTest(observations1, observations2);
System.out.println(String.format("Mean1: %.10f", mean1));
System.out.println(String.format("Mean2: %.10f", mean2));
System.out.println(String.format("heteroscedastic t-Test: %.10f", 
	(1 - significanceLevel1) * 100));
System.out.println(String.format("homoscedastic t-Test:   %.10f", 
	(1 - significanceLevel2) * 100));

The example code above also computes the so called “homoscedastic” t-test, which assumes that the two samples are drawn from subpopulations with equal variances. The two methods from the commons library compute the smallest “significance level” at which one can reject the null hypothesis that the two means are equal. The “confidence level”, which is easier to understand, can be computed by subtracting the “significance level” from 1. As the result is a probability, we can multiply it with 100 in order to get a statement in percentage:

Mean1: 175,3142857143
Mean2: 207,4000000000
heteroscedastic t-Test: 99,7562734547
homoscedastic t-Test:   99,7838832707

This means that we can reject the statement that the mean value of both sample series is equal with a probability of 99.8%. Or the other way round that the probability that both series have the same mean value is only 0.2%. Hence the two measurements are very likely to be different. But the result is not always as clear as in this case. Let’s take a look at these two series:

X Y
154.3 155.3
191.0 163.7
163.4 200.1
168.6 177.5
187.0 188.3
200.4 198.7
162.5 201.7

The output here is:

Mean1: 175,3142857143
Mean2: 183,6142857143
heteroscedastic t-Test: 59,4632442225
homoscedastic t-Test:   59,4717945546

At first glance the second series of sample values performs much slower. But the probability that we can reject the null hypothesis that both means are equal is only 59.5%. In other words: The probability that both series have the same mean value is only about 40.5%.

Converting a CSV file to parquet and querying it with Apache Drill

The Apache Drill project provides SQL-like access to different kinds of data stores. The supported data stores span relational as well as NoSQL databases and the file system. Hence you can query data from HBase, MongoDB, HDFS and the local file system without the need to convert the data before. Even joins between the different formats are possible. Internally Apache Drill prepares the potentially nested data (from for example JSON files) in a columnar representation as described in Google’s Dremel paper. This columnar data structures allow queries that only select a subset of the available columns to perform much faster, as only these selected columns have to be read from the data structure. In contrast to traditional relational databases the whole row with data does not have to be loaded from disc.

The data structure described in Google’s Dremel paper is also available as file format called parquet and allows you to store and retrieve data from a columnar storage. If you plan to execute multiple queries on a big data set, it can be reasonable to convert the CSV file to the parquet format and query it using Apache Drill. In this article we therefore explore how to convert a CSV file into a parquet file using Apache’s parquet library:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-column</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-hadoop</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>2.4.0</version>
</dependency>

In order to tell Apache Parquet the structure of the CSV file, we have to create an instance of MessageType and pass in a message definition written in Google’s Protocol Buffer (https://developers.google.com/protocol-buffers/). As our CSV file contains first name, last name, date of birth and place of birth for a large number of persons, the Protocol Buffer message looks like this:

message csv {
    required binary firstName = 1; 
    required binary lastName = 2; 
    required binary dateOfBirth = 3; 
    required binary placeOfBirth = 4;
}

All fields are required and of type binary. This message definition is then stored as the first line of our CSV file, such that it can be read directly from it.

The code in the main() method of our sample application looks like this:

String firstLine = Files.readFirstLine(options.getCsvPath().toFile(), 
	Charset.defaultCharset());
MessageType messageType = MessageTypeParser.parseMessageType(firstLine);
WriteSupport<List<String>> writeSupport = new CsvWriteSupport(messageType);
String line;
try (CsvParquetWriter writer = new CsvParquetWriter(path, writeSupport);
	BufferedReader br = new BufferedReader(new FileReader(options.getCsvPath().toFile()))) {
	while ((line = br.readLine()) != null) {
		String[] fields = line.split(Pattern.quote(SEPARATOR));
		writer.write(Arrays.asList(fields));
	}
}

The first two lines create an instance of MessageType using the first line of the CSV file. This instance of MessageType is then passed into the constructor of our CsvWriteSupport class:

public class CsvWriteSupport extends WriteSupport<List<String>> {
	private final MessageType messageType;
	private RecordConsumer recordConsumer;

	public CsvWriteSupport(MessageType messageType) {
		this.messageType = messageType;
	}

	@Override
	public WriteSupport.WriteContext init(Configuration configuration) {
		return new WriteSupport.WriteContext(messageType, new HashMap<String, String>());
	}

	@Override
	public void prepareForWrite(RecordConsumer recordConsumer) {
		this.recordConsumer = recordConsumer;
	}

	@Override
	public void write(List<String> record) {
		recordConsumer.startMessage();
		List<ColumnDescriptor> columns = messageType.getColumns();
		for (int i = 0; i < columns.size(); i++) {
			String value = record.get(i);
			if (value.length() > 0) {
				recordConsumer.startField(columns.get(i).getPath()[0], i);
				switch (columns.get(i).getType()) {
					...
					case BINARY:
						recordConsumer.addBinary(Binary.fromByteArray(value.getBytes(Charset.defaultCharset())));
						break;
					default:
						throw new ParquetEncodingException("Unsupported column type: " + columns.get(i).getType());
				}
				recordConsumer.endField(columns.get(i).getPath()[0], i);
			}
		}
		recordConsumer.endMessage();
	}
}

While the implementation of the two methods init() and prepareForWrite() is simple, the core logic resides in write(). Here the CsvWriteSupport tells parquet to start a new message and then to add different fields. The switch statement has been shortened to focus on the type binary. Here the string read from the CSV file gets converted into a byte array with respect to the default charset of the platform. Note that write() gets called within the while loop that iterates over the lines in the CSV file.

After having compiled the application into a jar file that contains all dependencies, we can start it with the following command and pass the existing CSV file and the name of the output file on the command line:

java -Djava.library.path=/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native -classpath "$HADOOP_JAR_DIRS:csv-to-parquet-0.0.1-SNAPSHOT-jar-with-dependencies.jar" com.wordpress.mdw.Main <csv> <parquet>

The first observation is, that the parquet file (here test.par) is much smaller than the corresponding CSV file:

417M test.cvs
35M  test.parquet

The compression ratio of about 92% stems from the fact that parquet supports very efficient compression and encoding schemes. As the data is stored in a columnar fashion, compression algorithms can use the fact that one column contains similar data. Even gzip compression of the CSV file only reaches 83% compaction ratio.

As a final step we want to query the number of records/lines in both files with Apache Drill and compare the execution times:

0: jdbc:drill:> select count(*) from dfs.`/tmp/test_drill_tab.csv`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (5.771 seconds)
0: jdbc:drill:> select count(*) from dfs.`/tmp/test.parquet`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (0.257 seconds)

From the explanations above it is clear that the first query has to read the complete file whereas the second query can concentrate on one column. Beyond that the parquet implementation does also store the number of values in each page header (a column is divided into multiple chunks/pages). The same is true when we ask Drill to count the entries where first name is ‘DAVID’:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID';
+---------+
| EXPR$0  |
+---------+
| 999190  |
+---------+
1 row selected (2.272 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (6.418 seconds)

The answer for the parquet file comes after about 2 seconds, the query running on the CSV file takes about 4 seconds longer. It even gets worse when querying two columns:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID' and lastName = 'MILLER';
+---------+
| EXPR$0  |
+---------+
| 110813  |
+---------+
1 row selected (5.838 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID' and columns[1] = 'MILLER';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (29.57 seconds)

Now the parquet query only takes about 19.7% of the CSV query’s time. Finally please note that we do not have any kind of indexes like in a traditional RDBMS. The idea of the Dremel paper is to perform always a “full” scan of the complete column. But querying 10 million records that are stored in a compressed format within 5 seconds is still not bad.

Conclusion: Storing data in the parquet file format does not only save disc space (compression ratio of about 92%) but also reduces query times by the factor three to five.

PS: The source code is available at github.

Follow

Get every new post delivered to your Inbox.