Converting a CSV file to parquet and querying it with Apache Drill

The Apache Drill project provides SQL-like access to different kinds of data stores. The supported data stores span relational as well as NoSQL databases and the file system. Hence you can query data from HBase, MongoDB, HDFS and the local file system without the need to convert the data before. Even joins between the different formats are possible. Internally Apache Drill prepares the potentially nested data (from for example JSON files) in a columnar representation as described in Google’s Dremel paper. This columnar data structures allow queries that only select a subset of the available columns to perform much faster, as only these selected columns have to be read from the data structure. In contrast to traditional relational databases the whole row with data does not have to be loaded from disc.

The data structure described in Google’s Dremel paper is also available as file format called parquet and allows you to store and retrieve data from a columnar storage. If you plan to execute multiple queries on a big data set, it can be reasonable to convert the CSV file to the parquet format and query it using Apache Drill. In this article we therefore explore how to convert a CSV file into a parquet file using Apache’s parquet library:

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-column</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>parquet-hadoop</artifactId>
	<version>1.6.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>2.4.0</version>
</dependency>

In order to tell Apache Parquet the structure of the CSV file, we have to create an instance of MessageType and pass in a message definition written in Google’s Protocol Buffer (https://developers.google.com/protocol-buffers/). As our CSV file contains first name, last name, date of birth and place of birth for a large number of persons, the Protocol Buffer message looks like this:

message csv {
    required binary firstName = 1; 
    required binary lastName = 2; 
    required binary dateOfBirth = 3; 
    required binary placeOfBirth = 4;
}

All fields are required and of type binary. This message definition is then stored as the first line of our CSV file, such that it can be read directly from it.

The code in the main() method of our sample application looks like this:

String firstLine = Files.readFirstLine(options.getCsvPath().toFile(), 
	Charset.defaultCharset());
MessageType messageType = MessageTypeParser.parseMessageType(firstLine);
WriteSupport<List<String>> writeSupport = new CsvWriteSupport(messageType);
String line;
try (CsvParquetWriter writer = new CsvParquetWriter(path, writeSupport);
	BufferedReader br = new BufferedReader(new FileReader(options.getCsvPath().toFile()))) {
	while ((line = br.readLine()) != null) {
		String[] fields = line.split(Pattern.quote(SEPARATOR));
		writer.write(Arrays.asList(fields));
	}
}

The first two lines create an instance of MessageType using the first line of the CSV file. This instance of MessageType is then passed into the constructor of our CsvWriteSupport class:

public class CsvWriteSupport extends WriteSupport<List<String>> {
	private final MessageType messageType;
	private RecordConsumer recordConsumer;

	public CsvWriteSupport(MessageType messageType) {
		this.messageType = messageType;
	}

	@Override
	public WriteSupport.WriteContext init(Configuration configuration) {
		return new WriteSupport.WriteContext(messageType, new HashMap<String, String>());
	}

	@Override
	public void prepareForWrite(RecordConsumer recordConsumer) {
		this.recordConsumer = recordConsumer;
	}

	@Override
	public void write(List<String> record) {
		recordConsumer.startMessage();
		List<ColumnDescriptor> columns = messageType.getColumns();
		for (int i = 0; i < columns.size(); i++) {
			String value = record.get(i);
			if (value.length() > 0) {
				recordConsumer.startField(columns.get(i).getPath()[0], i);
				switch (columns.get(i).getType()) {
					...
					case BINARY:
						recordConsumer.addBinary(Binary.fromByteArray(value.getBytes(Charset.defaultCharset())));
						break;
					default:
						throw new ParquetEncodingException("Unsupported column type: " + columns.get(i).getType());
				}
				recordConsumer.endField(columns.get(i).getPath()[0], i);
			}
		}
		recordConsumer.endMessage();
	}
}

While the implementation of the two methods init() and prepareForWrite() is simple, the core logic resides in write(). Here the CsvWriteSupport tells parquet to start a new message and then to add different fields. The switch statement has been shortened to focus on the type binary. Here the string read from the CSV file gets converted into a byte array with respect to the default charset of the platform. Note that write() gets called within the while loop that iterates over the lines in the CSV file.

After having compiled the application into a jar file that contains all dependencies, we can start it with the following command and pass the existing CSV file and the name of the output file on the command line:

java -Djava.library.path=/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native/Linux-amd64-64:/usr/lib/hadoop/lib/native -classpath "$HADOOP_JAR_DIRS:csv-to-parquet-0.0.1-SNAPSHOT-jar-with-dependencies.jar" com.wordpress.mdw.Main <csv> <parquet>

The first observation is, that the parquet file (here test.par) is much smaller than the corresponding CSV file:

417M test.cvs
35M  test.parquet

The compression ratio of about 92% stems from the fact that parquet supports very efficient compression and encoding schemes. As the data is stored in a columnar fashion, compression algorithms can use the fact that one column contains similar data. Even gzip compression of the CSV file only reaches 83% compaction ratio.

As a final step we want to query the number of records/lines in both files with Apache Drill and compare the execution times:

0: jdbc:drill:> select count(*) from dfs.`/tmp/test_drill_tab.csv`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (5.771 seconds)
0: jdbc:drill:> select count(*) from dfs.`/tmp/test.parquet`;
+-----------+
|  EXPR$0   |
+-----------+
| 10000001  |
+-----------+
1 row selected (0.257 seconds)

From the explanations above it is clear that the first query has to read the complete file whereas the second query can concentrate on one column. Beyond that the parquet implementation does also store the number of values in each page header (a column is divided into multiple chunks/pages). The same is true when we ask Drill to count the entries where first name is ‘DAVID’:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID';
+---------+
| EXPR$0  |
+---------+
| 999190  |
+---------+
1 row selected (2.272 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (6.418 seconds)

The answer for the parquet file comes after about 2 seconds, the query running on the CSV file takes about 4 seconds longer. It even gets worse when querying two columns:

0: jdbc:drill:> select count(firstName) from dfs.`/tmp/test.parquet` where firstName = 'DAVID' and lastName = 'MILLER';
+---------+
| EXPR$0  |
+---------+
| 110813  |
+---------+
1 row selected (5.838 seconds)
0: jdbc:drill:> select count(columns[0]) from dfs.`/tmp/test_drill_tab.csv` where columns[0] = 'DAVID' and columns[1] = 'MILLER';
+---------+
| EXPR$0  |
+---------+
+---------+
No rows selected (29.57 seconds)

Now the parquet query only takes about 19.7% of the CSV query’s time. Finally please note that we do not have any kind of indexes like in a traditional RDBMS. The idea of the Dremel paper is to perform always a “full” scan of the complete column. But querying 10 million records that are stored in a compressed format within 5 seconds is still not bad.

Conclusion: Storing data in the parquet file format does not only save disc space (compression ratio of about 92%) but also reduces query times by the factor three to five.

PS: The source code is available at github.

Advertisements

Tags: , , , , ,

2 responses to “Converting a CSV file to parquet and querying it with Apache Drill”

  1. davefauth (@davefauth) says :

    Can you post the code to your Github account?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: