By dan.mcclary on Aug 01, 2012
There's a lot to learn from log data, but to get the most value from it, that data needs to be easy to collect and analyze. Otherwise, time that could be used to learn from data is spent writing parsers and transport components. In this entry we'll simplify log collection and transport using JSON serialization and parts of the Hadoop ecosystem.
Logging everything in JSON is a great idea. As serialization formats go it's engineer-friendly: you and your favorite programming language can both read it. Moreover, having all of your log data structured as universal data structures makes getting started with analytics much simpler. To illustrate how much simpler, we'll take JSON logs written to a flat file, stream them into HDFS, and expose them via Hive for exploration and aggregation.
We're going to use three components to put our system together:
- A flat file that's collecting JSON data. Assume entries look a bit like this:
- Flume: the distributed log-collection service that's part of the Hadoop ecosystem
- Hive and a SerDe for handling JSON data
The "Tail Table"
We'll begin by setting up the final destination for our log data. This requires we create a directory in HDFS to hold the log data and define a Hive table over it. Making the directory's easy:
hadoop fs -mkdir /user/oracle/tail_table
Similarly, defining the external table is straightforward in the Hive command line:
CREATE EXTERNAL TABLE IF NOT EXISTS tail_table(fieldA int, fieldB string, fieldC float)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
This gives us a table which will read and respect the types of the values in our JSON records. If a field isn't present for a given record, a NULL value is returned for that column. Fields not included in the CREATE statement are ignored but still exist in the JSON. This allows the schema of the JSON to remain flexible while minimally impacting the Hive table.
Streaming Data with Flume OG
CDH 3u4 ships with two very different versions of Flume. The default is Flume 0.9.4, or Flume OG . It's great at streaming data into HDFS, but Flume OG has some requirements.
- You must run Zookeeper to coordinate Flume nodes
- You must run a Flume master to control Flume nodes
- Set the source as:
- Set the sink as:
collectorSink("hdfs://namenode/user/oracle/tail_table", "logdata", 30)
This configuration will tail the log file and write a new message into HDFS with each new line. The collectorSink will commit data to our Hive table every 30 seconds. The resulting configuration looks like this:
Streaming Data with Flume NGThe other version of Flume which ships with CDH3 is Flume NG . Flume NG is significantly different from its predecessor. Our tail source from the previous section is gone, but so too are many of restrictions.
- Zookeeper is no longer a requirement
- The master-slave architecture has been replaced by independent Flume agents
- We can now use Avro RPCs to transfer data in multi-hop flows
For this type of multi-hop log transfer, we need a flume-ng-agent running on each application server and one on the Hadoop cluster. The application servers will have a flume.conf file which includes something like this:
app-agent.sources = tail
app-agent.channels = memoryChannel
app-agent.sinks = avro-forward-sink
app-agent.sources.tail.type = exec
app-agent.sources.tail.command = tail -f /path/to/json.log
app-agent.sources.avro-forward-sink.type = avro
app-agent.sources.avro-forward-sink.hostname = 10.1.1.100
app-agent.sources.avro-forward-sink.port = 10000
This sets up a source that runs "tail" and sinks that data via Avro RPC to 10.1.1.100 on port 10000.
The collecting Flume agent on the Hadoop cluster will need a flume.conf with an avro source and an HDFS sink.
hdfs-agent.sinks = hdfs-write
hdfs-agent.channels = memoryChannel
hdfs-agent.sources.avro-collect.type = avro
hdfs-agent.sources.avro-collect.bind = 10.1.1.100
hdfs-agent.sources.avro-collect.port = 10000
hdfs-agent.sinks.hdfs-write.type = hdfs
hdfs-agent.sinks.hdfs-write.path = hdfs://namenode/user/oracle/tail_table
hdfs-agent.sinks.hdfs-write.rollInterval = 30
On this side we've defined a source that reads Avro messages from port 10000 on 10.1.1.100 and writes the results into HDFS, rolling the file every 30 seconds. It's just like our setup in Flume OG, but now multi-hop forwarding is a snap.
The resulting configuration looks like this:
No matter which version you deploy, the combination of Flume, Hive and JSON make it straightforward to up an end-to-end pipeline for consuming and analyzing serialized log data. With deployments this simple, you can spend more time focusing on your applications and analytics