Decoding Apache Flume and Its Capabilities to Automate Data Transfer

Utkarsh Goel
Walmart Global Tech Blog
8 min readApr 26, 2022

--

Photo Credit: manfredsteger

Moving data from varied sources into a single repository for consumption can be extremely time-consuming and complex. When my team came across this issue, we turned to the open-source project Apache Flume to automate the data transfer process.

Apache Flume enabled us to automatically move our data from the legacy Informix database, ActiveMQ, IBM MQ, MAAS, Kafka, and other sources by simply editing a configuration file. In this article, we will decode Apache Flume and share some of our learnings to help you better leverage Apache Flume’s capabilities.

Apache Flume Overview

Flume is a distributed, reliable, and available data collection service for collecting, aggregating, and moving a large amount of data from an event source to a destination.

A Flume agent is a JVM process which has three components: Flume Source, Flume Channel, and Flume Sink.

Flume Components
  • In the above diagram, the events generated by Event Source are consumed by Flume Source.
  • Flume Source receives an event and stores it in one or more channels. The channel acts as a store, which keeps the event until it’s consumed by the Flume Sink.
  • Flume Sink removes the event from the channel and stores it in an external destination (e.g., HDFS).

Advantages of Using Kafka Channel

Apache Flume also supports a Kafka channel, which needs to be installed separately and is used to store the events.

The Kafka channel can be used for multiple scenarios:

  1. With Flume source and sink: It provides a reliable and highly available channel for events.
  2. With Flume source and interceptor but no sink: It allows for writing Flume events into a Kafka topic for use by other apps.
  3. With Flume sink, but no source: It is a low-latency, fault-tolerant way to send events from Kafka to Flume sinks, such as HDFS, HBase, or Elastic Search.

Flume uses a transactional approach to guarantee the reliable delivery of events. It uses two independent transactions, which are responsible for event delivery from source to channel and from channel to sink. The transaction has two characteristics: successful commit and failed rollback.

Picture Credit: Flume Event Transaction

For reliability and to ensure zero data loss in case of transaction failures and rollback, Flume needs to be configured as per the source.

  • Kafka Source: By configuring the Consumer group for a Kafka source topic, this ensures that the pointer won’t move ahead in case of transaction failures.
  • JMS Source: If the source type is Queue, dead letter queues can be configured to ensure zero data loss and recoverability. In the case of Topic, the following properties need to be defined in the JMS Source:
  • Custom JDBC Source: In case of a custom source (e.g., JDBC), the respective table that last successfully read last_change_ts can be maintained and updated in case of a successful commit.

Availability

To ensure high availability and low event latency, Apache Flume needs to be up and running, as well as able to recover itself if there’s a process shutdown.

Here are some tips to ensure high availability for Flume:

  • Configure Flume agent startup commands as a child process in supervisord (the process management system that’s responsible for starting child programs at its own invocation, responding to commands from clients, and restarting crashed or exited sub-processes).
  • The above point takes care of any issues if the whole Flume JVM goes down. However, since a Flume JVM has n agents, to ensure all agents are up and running, monitoring can be done at the Flume agent level using JMX.
  • Flume agent logs can be parsed, and alerts can be set up in a log monitoring tool for any exceptions.

Sample Supervisord configuration file for Apache Flume:

[program:flumetask-agent-1]command=/home/flume/apache-flume-1.9.0-bin/scripts/flume-agent.sh agent-1 8080
autostart=true
autorestart=true
startretries=3
stderr_logfile=/home/flume/apache-flume-1.9.0-bin/conf/agent-1/flumetest-supervisor-agent-1.err.log
stdout_logfile=/home/flume/apache-flume-1.9.0-bin/conf/agent-1/flumetest-supervisor-agent-1.out.log
user=flumeuser
logfile_maxbytes=10MB
logfile_backups=0-bash-4.2$

In the above configuration file, the command key points to the script file “flume-agent.sh” which will be run at the time of Flume process exit.

Monitoring

For monitoring, Apache Flume reports JMX metrics. The metrics can be reported in different forms (e.g., JSON), and any other custom monitoring class can be written to poll the platform MBean server and send to a reporting server.

For JSON reporting, the following properties need to be added to the Flume agent startup command:

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

We will now touch base on how the JMX metrics can be used for monitoring purposes.

Liveliness Check:

Get call can be made on http://<%FlumeServerIP%>:<%FlumeMonitoringPort%>/metrics (for HTTP reporting) and if the API returns anything other than 200 OK, then it means the Flume agent configuration and logs need to be looked into.

Using JMX Metrics:

As mentioned above, Flume reports multiple types of JMX metrics for each component (source, channel, and sink). The metrics are defined below:

Source Metrics:

Channel Metrics:

Sink Metrics:

The above metrics are specific to the type of component (source, channel, and sink) and can be used in multiple ways:

1. To identify if there is an issue with a single Flume component and not the whole process, the metrics can be used.

Each component’s metrics are reported in key-value manner. For example, the key is the “SOURCE.<%source-name%>” and the value will be the JSON containing the source metrics.

The alerting app can get the metrics’ API response and check for each component to be present for atomic liveliness.

Example Metrics:

{"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085","Type":"CHANNEL","StopTime":"0","EventPutAttemptCount":"468086","ChannelSize":"233428","StartTime":"1344882233070","EventTakeSuccessCount":"458200","ChannelCapacity":"600000","EventTakeAttemptCount":"458288"},"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908","Type":"CHANNEL","StopTime":"0","EventPutAttemptCount":"22948908","ChannelSize":"5","StartTime":"1344882209413","EventTakeSuccessCount":"22948900","ChannelCapacity":"100","EventTakeAttemptCount":"22948908"}}

2. As we know, the user can define multiple hops between the source and sink in general, so to keep event latency at a minimum, the alert can be set up for lag using the source metric.

(EventReceivedCount — EventAcceptedCount) gives the number of events for which successful commit is not marked by the source. Threshold can be kept on the value, and an alert can be raised if the threshold is breached.

Scalability

Apache Flume is horizontally-scalable, meaning multiple agents can be configured to connect to the same source and a similar data flow can be specified for each agent.

For an individual agent to achieve the maximum throughput, the below tuning can be done.

Batching:

Flume reads the data from the source in batch and for the batch, either successful commit or failed rollback is done.

The parameters batchSize and batchDurationMillis in the source and sink can be tuned to increase the throughput. The tuned value depends on the type of source/sink used and the event latency desired.

<agent_name>.sinks.<sink_name>.batchSize = 10000
<agent_name>.sinks.<sink_name>.batchDurationMillis = 10000

For Flume Kafka Source, multiple Flume agents can be configured with the same consumer group so each will read a unique set of partitions for a topic.

In cases where the bottleneck isn’t at the source and the channel/sink isn’t able to catch up with the ingestion throughput, then the below strategy can be used.

Multiple Sinks-Multiple Channels:

To a source, multiple channels can be attached and a sink can be attached to each channel. Flume provides two types of channel selectors (replicating or multiplexing), and custom channel selectors can be written to evenly distribute the event to channel and balance the load.

Deployment Pipeline

A Flume agent is started using a shell script called flume-ng, which is located in the bin directory of the Flume distribution. Flume agent name, configuration directory, and the configuration file are the mandatory parameters required to start an agent.

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

To cover the reliability and recoverability aspects and to write custom Flume components to the deployment, the below additional steps are required:

  • Configuring supervisor.d for each Flume agent
  • Configuring parameters for enabling JMX JSON reporting
  • Add required jars to the Flume lib folder during deployment for the custom code to work
  • Starting multiple Flume agents on a single machine

The below scripts take care of the above steps during a Flume deployment:

cd /home/flume
echo "Validating supervisord installation" >> $FLUME_DEPLOY_LOG
supervisor_pid='ps -aef | grep java | grep supervisor | grep -v grep | awk '{print$2}''; //Check if supervisord is installed or notif [ $supervisor_pid -gt 0 ]; then
echo "Supervisord already installed" >> $FLUME _DEPLOY_LOG
echo "Stopping supervisord to configure flume" >> $FLUME _DEPLOY_LOG
sudo systemctl stop supervisord
else
echo "Installing supervisord" >> $FLUME_DEPLOY_LOG
sudo yum -y install supervisor
fi
echo "Killing existing flume agents:" >> $FLUME_DEPLOY_LOG //Step to kill any current agent and do fresh deployment
for pid in `ps -aef | grep flume | awk '{print $2}'`
do
kill -9 $pid
done;
echo "Copying Flume:" >> $FLUME_DEPLOY_LOG
cp /home/flume/current/flume-$FLUME_VERSION/apache-flume-1.9.0-bin.tar /home/flume
echo "Unzip Flume Tar:" >> $FLUME_DEPLOY_LOG
tar xf apache-flume-1.9.0-bin.tar
echo "Copy jars and scripts to flume:" >> $FLUME_DEPLOY_LOG //Step which take care to copy additional jars and any script to the flume folder
cd /home/flume/current/flume-$FLUME_VERSION
rm -f $FLUME_HOME/lib/flume-*.jar //Remove any previous jar
cp *.jar $FLUME_HOME/lib/
cd bin
cp *.sh $FLUME_HOME/scripts
echo "Configuring Supervisord" >> $FLUME_DEPLOY_LOG
sudo rm -rf /etc/supervisord.d/*
echo "Copying supervisor supervisord configs to /etc/supervisord.d" >> $FLUME_DEPLOY_LOGexport port=8079
for agent in $(echo ${FLUME_AGENT_NAME} | sed "s/,/ /g") //Loop to deploy supervisord for each flume agent
do
cd /home/flume/apache-flume-1.9.0-bin/conf
mkdir ${agent}
cd ${agent}
cp /home/flume/apache-flume-1.9.0-bin/conf/log4j.properties .
mkdir logs
export port=$((port+1))
sudo cp /home/flume/current/flume-$FLUME_VERSION/bin/flume-supervisor.ini.template /etc/supervisord.d/flume-supervisor.$agent.ini
sudo sed -i 's/agentName/'"${agent}"'/g' /etc/supervisord.d/flume-supervisor.$agent.ini
sudo sed -i 's/portNum/'"${port}"'/g' /etc/supervisord.d/flume-supervisor.$agent.ini
echo "Copied supervisor ini for agent name : $agent" >> $FLUME_DEPLOY_LOG
done
echo "Starting supervisord" >> $FLUME_DEPLOY_LOG
sudo systemctl enable supervisord
sudo systemctl start supervisord

Note: All the deployment logs are written to the location specified in the variable $FLUME_DEPLOY_LOG.

--

--

Utkarsh Goel
Walmart Global Tech Blog

Experienced Software Professional in the area of backend, java, microservices and distributed systems.