Tag Archive | sql

Apache Calcite: Setting up your own in-memory database with SQL interface

Some time ago we have seen how to use Apache Drill to query data that resides in CSV and parquet files. Apache Drill’s SQL interface is provided by another Apache project called “Calcite“. This project provides a SQL parser, JDBC driver and query optimizer that can be connected to different data stores via custom adapters.

In this article we are investigating how to use the ReflectiveSchema factory to create an in-memory database with SQL interface.

The schemas that the SQL parser should operate on can be specified either programmatically or with the means of a JSON file. Such JSON file can look like the following one:

{
  version: '1.0',
  defaultSchema: 'Persons',
  schemas: [
    {
      name: 'Persons',
      type: "custom",
      factory: "org.apache.calcite.adapter.java.ReflectiveSchema$Factory",
      operand: {
        class: "com.wordpress.martinsdeveloperworld.calcite.Schema",
        staticMethod: "getInstance"
      }
    }
  ]
}

The only schema we have specified with the file above is called “Persons” and is at the same time our default schema. The factory defined with the correspondent field name has to implement a method that returns an instance of the Calcite class Schema. Here we choose the ReflectiveSchema that ships with Calcite and that exposes the public fields of a Java object as tables. The class that generates this Java object is given through the operand’s field class and has to provide a factory method that returns this object (here: getInstance).

The Schema class mentioned above looks in our example like this:

public class Schema {
	private static final Logger LOGGER = Logger.getLogger(Schema.class.getName());
	public Person[] persons;
	public Address[] addresses;

	public static Schema getInstance() {
		LOGGER.log(Level.INFO, "Creating schema...");
		DataFactory dataFactory = new DataFactory(0);
		int numberOfPersons = 10000000;
		Schema schema = new Schema();
		schema.persons = new Person[numberOfPersons];
		schema.addresses = new Address[numberOfPersons];
		for (int i = 0; i < numberOfPersons; i++) {
			Person person = dataFactory.getNextPerson(i);
			schema.persons[i] = person;
			schema.addresses[i] = dataFactory.getNextAddress(person);
		}
		LOGGER.log(Level.INFO, "Created schema.");
		return schema;
	}
}

The two public fields persons and addresses will become the tables of our SQL schema. We initialize these two arrays with ten million persons and addresses, one person having exactly one address. The artificially generated id of the person is used as foreign key in the addresses table:

public class Person {
	public long id;
	public String firstName;
	public String lastName;
}
public class Address {
	public long personId;
	public String city;
}

The DataFactory creates a new person and randomly assigns a first and last name for each person as well as a city for each address. These values are taken from a collection of the most popular 200 first and last names in the US and the 100 biggest cities.

Now that we have created the schema and populated the tables with ten million rows, we can start to query them. The code to load the JDBC driver and to establish a connection to the data source looks like this one:

Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
try (Connection connection = 
	DriverManager.getConnection("jdbc:calcite:model=target/classes/model.json", info)) {
	...
}

The JSON file, that is referred to as model within the JDBC URL, is the one shown at the beginning of this article. First we want to know how many people have the last name ‘Smith’:

String query = "select count(*) from persons p where p.lastName = 'Smith'";
try (Statement statement = connection.createStatement();
	ResultSet resultSet = statement.executeQuery(query)) {
	int count = 0;
	while (resultSet.next()) {
		count = resultSet.getInt(1);
	}
	LOGGER.log(Level.INFO, "Result has " + count + " rows.");
} catch (Exception e) {
	LOGGER.log(Level.SEVERE, "Query failed: " + e.getMessage(), e);
}

When we modify the code above such that the query gets executed in a loop with randomly chosen last names from the collection, we can measure the average execution time of it. On my machine this yields about 105,3 ms over 100 iterations. Not bad!

But we also want to know how many people of these live in Chicago:

String query = "select count(*) from persons p " +
	" inner join addresses a on a.personId = p.id " +
	" where a.city = 'Chicago' and p.lastName = 'Smith'";
...

Executed with different, randomly chosen last names and cities, this query executes in average in about 341,9 ms. For a join query on two tables with ten million rows each this is also not that bad.

PS: The code is available on my github account.

Advertisements

Converting a CSV file to parquet and querying it with Apache Drill

The Apache Drill project provides SQL-like access to different kinds of data stores. The supported data stores span relational as well as NoSQL databases and the file system. Hence you can query data from HBase, MongoDB, HDFS and the local file system without the need to convert the data before. Even joins between the different formats are possible. Internally Apache Drill prepares the potentially nested data (from for example JSON files) in a columnar representation as described in Google’s Dremel paper. This columnar data structures allow queries that only select a subset of the available columns to perform much faster, as only these selected columns have to be read from the data structure. In contrast to traditional relational databases the whole row with data does not have to be loaded from disc.

The data structure described in Google’s Dremel paper is also available as file format called parquet and allows you to store and retrieve data from a columnar storage. If you plan to execute multiple queries on a big data set, it can be reasonable to convert the CSV file to the parquet format and query it using Apache Drill. In this article we therefore explore how to convert a CSV file into a parquet file using Apache’s parquet library:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-column</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-hadoop</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>2.4.0</version>
</dependency>

In order to tell Apache Parquet the structure of the CSV file, we have to create an instance of MessageType and pass in a message definition written in Google’s Protocol Buffer (https://developers.google.com/protocol-buffers/). As our CSV file contains first name, last name, date of birth and place of birth for a large number of persons, the Protocol Buffer message looks like this:

message csv {
    required binary firstName = 1; 
    required binary lastName = 2; 
    required binary dateOfBirth = 3; 
    required binary placeOfBirth = 4;
}

All fields are required and of type binary. This message definition is then stored as the first line of our CSV file, such that it can be read directly from it.

The code in the main() method of our sample application looks like this:

String firstLine = Files.readFirstLine(options.getCsvPath().toFile(), 
	Charset.defaultCharset());
MessageType messageType = MessageTypeParser.parseMessageType(firstLine);
WriteSupport<List<String>> writeSupport = new CsvWriteSupport(messageType);
String line;
try (CsvParquetWriter writer = new CsvParquetWriter(path, writeSupport);
	BufferedReader br = new BufferedReader(new FileReader(options.getCsvPath().toFile()))) {
	while ((line = br.readLine()) != null) {
		String[] fields = line.split(Pattern.quote(SEPARATOR));
		writer.write(Arrays.asList(fields));
	}
}

The first two lines create an instance of MessageType using the first line of the CSV file. This instance of MessageType is then passed into the constructor of our CsvWriteSupport class:

public class CsvWriteSupport extends WriteSupport<List<String>> {
	private final MessageType messageType;
	private RecordConsumer recordConsumer;

	public CsvWriteSupport(MessageType messageType) {
		this.messageType = messageType;
	}

	@Override
	public WriteSupport.WriteContext init(Configuration configuration) {
		return new WriteSupport.WriteContext(messageType, new HashMap<String, String>());
	}

	@Override
	public void prepareForWrite(RecordConsumer recordConsumer) {
		this.recordConsumer = recordConsumer;
	}

	@Override
	public void write(List<String> record) {
		recordConsumer.startMessage();
		List<ColumnDescriptor> columns = messageType.getColumns();
		for (int i = 0; i < columns.size(); i++) {
			String value = record.get(i);
			if (value.length() > 0) {
				recordConsumer.startField(columns.get(i).getPath()[0], i);
				switch (columns.get(i).getType()) {
					...
					case BINARY:
						recordConsumer.addBinary(Binary.fromByteArray(value.getBytes(Charset.defaultCharset())));
						break;
					default:
						throw new ParquetEncodingException("Unsupported column type: " + columns.get(i).getType());
				}
				recordConsumer.endField(columns.get(i).getPath()[0], i);
			}
		}
		recordConsumer.endMessage();
	}
}

While the implementation of the two methods init() and prepareForWrite() is simple, the core logic resides in write(). Here the CsvWriteSupport tells parquet to start a new message and then to add different fields. The switch statement has been shortened to focus on the type binary. Here the string read from the CSV file gets converted into a byte array with respect to the default charset of the platform. Note that write() gets called within the while loop that iterates over the lines in the CSV file.

After having compiled the application into a jar file that contains all dependencies, we can start it with the following command and pass the existing CSV file and the name of the output file on the command line:

java -Djava.library.path=/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native -classpath "$HADOOP_JAR_DIRS:csv-to-parquet-0.0.1-SNAPSHOT-jar-with-dependencies.jar" com.wordpress.mdw.Main <csv> <parquet>

The first observation is, that the parquet file (here test.par) is much smaller than the corresponding CSV file:

417M test.cvs
35M  test.parquet

The compression ratio of about 92% stems from the fact that parquet supports very efficient compression and encoding schemes. As the data is stored in a columnar fashion, compression algorithms can use the fact that one column contains similar data. Even gzip compression of the CSV file only reaches 83% compaction ratio.

As a final step we want to query the number of records/lines in both files with Apache Drill and compare the execution times:

0: jdbc:drill:> select count(*) from dfs.`/tmp/test_drill_tab.csv`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (5.771 seconds)
0: jdbc:drill:> select count(*) from dfs.`/tmp/test.parquet`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (0.257 seconds)

From the explanations above it is clear that the first query has to read the complete file whereas the second query can concentrate on one column. Beyond that the parquet implementation does also store the number of values in each page header (a column is divided into multiple chunks/pages). The same is true when we ask Drill to count the entries where first name is ‘DAVID’:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID';
+---------+
| EXPR$0  |
+---------+
| 999190  |
+---------+
1 row selected (2.272 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (6.418 seconds)

The answer for the parquet file comes after about 2 seconds, the query running on the CSV file takes about 4 seconds longer. It even gets worse when querying two columns:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID' and lastName = 'MILLER';
+---------+
| EXPR$0  |
+---------+
| 110813  |
+---------+
1 row selected (5.838 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID' and columns[1] = 'MILLER';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (29.57 seconds)

Now the parquet query only takes about 19.7% of the CSV query’s time. Finally please note that we do not have any kind of indexes like in a traditional RDBMS. The idea of the Dremel paper is to perform always a “full” scan of the complete column. But querying 10 million records that are stored in a compressed format within 5 seconds is still not bad.

Conclusion: Storing data in the parquet file format does not only save disc space (compression ratio of about 92%) but also reduces query times by the factor three to five.

PS: The source code is available at github.

Using @NamedEntityGraph to load JPA entities more selectively in N+1 scenarios

The N+1 problem is a common issue when working with ORM solutions. It happens when you set the fetchType for some @OneToMany relation to lazy, in order to load the child entities only when the Set/List is accessed. Let’s assume we have a Customer entity with two relations: a set of orders and a set of addresses for each customer.

@OneToMany(mappedBy = "customer", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private Set<OrderEntity> orders;

@OneToMany(mappedBy = "customer", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private Set<AddressEntity> addresses;

To load all customers, we can issue the following JPQL statement and afterwards load all orders for each customer:

List<CustomerEntity> resultList = entityManager.createQuery("SELECT c FROM CustomerEntity AS c", CustomerEntity.class).getResultList();
for(CustomerEntity customerEntity : resultList) {
    Set<OrderEntity> orders = customerEntity.getOrders();
    for(OrderEntity orderEntity : orders) {
	...
    }
}

Hibernate 4.3.5 (as shipped with JBoss AS Wildfly 8.1.0CR2) will generate the following series of SQL statements out of it for only two(!) customers in the database:

Hibernate: 
     select
         customeren0_.id as id1_1_,
         customeren0_.name as name2_1_,
         customeren0_.numberOfPurchases as numberOf3_1_ 
     from
         CustomerEntity customeren0_
Hibernate: 
     select
         orders0_.CUSTOMER_ID as CUSTOMER4_1_0_,
         orders0_.id as id1_2_0_,
         orders0_.id as id1_2_1_,
         orders0_.campaignId as campaign2_2_1_,
         orders0_.CUSTOMER_ID as CUSTOMER4_2_1_,
         orders0_.timestamp as timestam3_2_1_ 
     from
         OrderEntity orders0_ 
     where
         orders0_.CUSTOMER_ID=?
Hibernate: 
     select
         orders0_.CUSTOMER_ID as CUSTOMER4_1_0_,
         orders0_.id as id1_2_0_,
         orders0_.id as id1_2_1_,
         orders0_.campaignId as campaign2_2_1_,
         orders0_.CUSTOMER_ID as CUSTOMER4_2_1_,
         orders0_.timestamp as timestam3_2_1_ 
     from
         OrderEntity orders0_ 
     where
         orders0_.CUSTOMER_ID=?

As we can see, the first query selects all customers from the table CustomerEntity. The following two selects fetch then the orders for each customer we have loaded in the first query. When we have 100 customers instead of two, we will get 101 queries. One initial query to load all customers and then for each of the 100 customers an additional query for the orders. That is the reason why this problem is called N+1.

A common idiom to solve this problem is to force the ORM to generate an inner join query. In JPQL this can be done by using the JOIN FETCH clause like demonstrated in the following code snippet:

entityManager.createQuery("SELECT c FROM CustomerEntity AS c JOIN FETCH c.orders AS o", CustomerEntity.class).getResultList();

As expected the ORM now generates an inner join with the OrderEntity table and therewith only needs one SQL statement to load all data:

select
    customeren0_.id as id1_0_0_,
    orders1_.id as id1_1_1_,
    customeren0_.name as name2_0_0_,
    orders1_.campaignId as campaign2_1_1_,
    orders1_.CUSTOMER_ID as CUSTOMER4_1_1_,
    orders1_.timestamp as timestam3_1_1_,
    orders1_.CUSTOMER_ID as CUSTOMER4_0_0__,
    orders1_.id as id1_1_0__
from
    CustomerEntity customeren0_
inner join
    OrderEntity orders1_
        on customeren0_.id=orders1_.CUSTOMER_ID

In situations where you know that you will have to load all orders for each customer, the JOIN FETCH clause minimizes the number of SQL statements from N+1 to 1. This comes of course with the drawback that you now transfer for all orders of one customer the customer data again and again (due to the additional customer columns in the query).

The JPA specification introduces with version 2.1 so called NamedEntityGraphs. This annotation lets you describe the graph a JPQL query should load in more detail than a JOIN FETCH clause can do and therewith is another solution to the N+1 problem. The following example demonstrates a NamedEntityGraph for our customer entity that is supposed to load only the name of the customer and its orders. The orders are described in the subgraph ordersGraph in more detail. Here we see that we only want to load the fields id and campaignId of the order.

@NamedEntityGraph(
        name = "CustomersWithOrderId",
        attributeNodes = {
                @NamedAttributeNode(value = "name"),
                @NamedAttributeNode(value = "orders", subgraph = "ordersGraph")
        },
        subgraphs = {
                @NamedSubgraph(
                        name = "ordersGraph",
                        attributeNodes = {
                                @NamedAttributeNode(value = "id"),
                                @NamedAttributeNode(value = "campaignId")
                        }
                )
        }
)

The NamedEntityGraph is given as a hint to the JPQL query, after it has been loaded via EntityManager using its name:

EntityGraph entityGraph = entityManager.getEntityGraph("CustomersWithOrderId");
entityManager.createQuery("SELECT c FROM CustomerEntity AS c", CustomerEntity.class).setHint("javax.persistence.fetchgraph", entityGraph).getResultList();

Hibernate supports the @NamedEntityGraph annotation since version 4.3.0.CR1 and creates the following SQL statement for the JPQL query shown above:

Hibernate: 
    select
        customeren0_.id as id1_1_0_,
        orders1_.id as id1_2_1_,
        customeren0_.name as name2_1_0_,
        customeren0_.numberOfPurchases as numberOf3_1_0_,
        orders1_.campaignId as campaign2_2_1_,
        orders1_.CUSTOMER_ID as CUSTOMER4_2_1_,
        orders1_.timestamp as timestam3_2_1_,
        orders1_.CUSTOMER_ID as CUSTOMER4_1_0__,
        orders1_.id as id1_2_0__ 
    from
        CustomerEntity customeren0_ 
    left outer join
        OrderEntity orders1_ 
            on customeren0_.id=orders1_.CUSTOMER_ID

We see that Hibernate does not issue N+1 queries but that instead the @NamedEntityGraph annotation has forced Hibernate to load the orders per left outer join. This is of course a subtle difference to the FETCH JOIN clause, where Hibernate created an inner join. The left outer join would also load customers for which no order exists in contrast to the FETCH JOIN clause, where we would only load customers that have at least one order.

Interestingly is also that Hibernate loads more than the specified attributes for the tables CustomerEntity and OrderEntity. As this conflicts with the specification of @NamedEntityGraph (section 3.7.4) I have created an JIRA issue for that.

Conclusion: We have seen that with JPA 2.1 we have two solutions for the N+1 problem: We can either use the FETCH JOIN clause to eagerly fetch a @OneToMany relation, which results in an inner join, or we can use @NamedEntityGraph feature that lets us specify which @OneToMany relation to load via left outer join.