Written on February 26, 2016
Author: Lewis Gavin

Apache Flume

In this post I will be discussing some simple use cases and important features of Apache Flume.

Apache Flume

Flume is a distributed service dedicated to aggregating and transporting large amount of event data (namely log data) from many different sources into a central storage location. For the purposes of this blog, the central storage location will be a Hadoop cluster.

Flume Architecture

So what does this mean? Essentially, flume can efficiently stream data from a user defined source to a user defined target, continuously! This allows continued ingestion of data from a source that is always producing data, hence why log capturing is a main use case, however mining social media data is also a good use case.

What is an Agent?

Flume data flows are defined as events and all events pass through an Agent. An Agent is a JVM process that contains a Source, a Channel and a Sink that all contribute to obtaining an event and storing it on the target data store.

Agents are configured using a configuration file that is like a Java properties file. This gets passed to the flume application as a parameter when it’s started. An agent is started as follows:

flume-ng agent -n <AGENT_NAME> -c conf -f <PATH_TO_CONF_FILE>

What is a Channel?

A Channel is used to stage events on an agent using an in memory queue. It has only one default property and that is its type. I will create a Flume configuration file to demonstrate how to set up a channel followed by a source and sink.

TwitterAgent.sources = Twitter  
TwitterAgent.channels = TwitterChannel  
TwitterAgent.sinks = HDFS  

TwitterAgent.channels.TwitterChannel.type = memory  

Here I have defined a Channel named TwitterChannel, a Source named Twitter and a Sink named HDFS. I will demonstrate how to configure the Source and Sink in the next few sections.

What is a Source?

A Source referes to a data source that can be connected to using Flume. There are many different sources that can be connected to, for this example I will be connecting to Twitter.

Each source will have a different set of attributes that can be set. Some of these attributes are mandatory, others are optional meaning their default value will be set if they arent specified within the config file.

To connect to a twitter source first we need to obtain a set of keys by registering as a twitter app developer. This can be done by going to https://apps.twitter.com and creating a new app.

The mandatory properties for the flume source are: channels, type, consumerKey, consumerSecret, accessToken and accessTokenSecret. I set them like so:

TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey=XXXXXXXXXXXXXXXX
TwitterAgent.sources.Twitter.consumerSecret=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
TwitterAgent.sources.Twitter.accessToken=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
TwitterAgent.sources.Twitter.accessTokenSecret=0XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
TwitterAgent.sources.Twitter.maxBatchSize = 1000
TwitterAgent.sources.Twitter.maxBatchDurationMillis=100000
TwitterAgent.sources.Twitter.keywords= Hadoop, bigdata, spark, hive, Hbase, flume
TwitterAgent.sources.Twitter.channels=TwitterChannel

The consumerKey, consumerSecret, accessToken and accessTokenSecret should be filled in using the values given to you by Twitter. There are some additional settings that have been set: maxBatchSize means that upto 1000 tweets will be collected before being processed as a batch, maxBatchDurationMillis sets the max number of milliseconds to wait before a batch is considered done.

What is a Sink?

Now I have connected to a data source, I need somewhere to write the data that it collects. A Sink specifies where the data obtained from the source should be output. This can be simply logged to the screen, written to the local filesystem, written to HDFS or many other alternatives.

I will be writing the Twitter data obtained to HDFS. The mandatory properties that need to be set to write to HDFS include: channel, type, hdfs.path. I set up my Sink as follows:

TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.channel=TwitterChannel
TwitterAgent.sinks.HDFS.hdfs.path=/flume/events/twitter
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.filePrefix=twitter-test

The twitter data will be output to files that contain upto 1000 records (as specified by the source) to a HDFS directory named /flume/events/twitter as Text files. Each file will contain a prefix of twitter-test. When the flume application is run, we would expect files within hdfs like so:

$ hdfs dfs -ls /flume/events/twitter
Found 2 items
-rw-rw-rw-   1 gavlaaaaaaaa supergroup     384147 2016-03-01 13:36 /flume/events/twitter/twitter-test.1456868142401
-rw-rw-rw-   1 gavlaaaaaaaa supergroup     386884 2016-03-01 13:36 /flume/events/twitter/twitter-test.1456868142402

Interceptors

Flume interceptors are useful when you want to apply a set of rules to modify or ignore data being ingested. This can range from adding header information, regex filtering and regex replacing. Interceptors are configured as a Source property, and one or more interceptors can be applied to a source by simply using a space seperated list. The order of list defines the order they are applied.

TwitterAgent.sinks = HDFS
TwitterAgent.sources = Twitter
TwitterAgent.channels = TwitterChannel

TwitterAgent.sources.Twitter.interceptors = twitIntercept1 twitIntercept2
TwitterAgent.sources.Twitter.interceptors.twitIntercept1.type = search_replace
TwitterAgent.sources.Twitter.interceptors.twitIntercept1.searchPattern = RT
TwitterAgent.sources.Twitter.interceptors.twitIntercept1.replaceString = "Retweet"

TwitterAgent.sources.Twitter.interceptors.twitIntercept2.type = timestamp

The above configuration would look for any incoming tweets that contain the string ‘RT’ and replace it with ‘Retweet’ and also insert a timestamp in milliseconds into the event headers, containing the time at which the event was processed.