Hadoop
Apache Hadoop is an open-source software framework written in Java for distributed processing of large data sets (being processed by MapReduce).
Links
- Book Hadoop: The Definitive Guide with example code
The idea - MapReduce
You have very large input files. Those files can be split and each chunk can be processed seperately. Each map run will take one part of the input and map it into a key value pair. In the shuffle step the results from map runs is grouped by its key and assigned to a reduce process. Finally the reduce step will do something with the key value pairs it got. If there is more than one reduce run for a key their results are fed again into a reduce run. The map and reduce steps can be distributed to a lot of different nodes (computer) and a distributed file system helps to send the data to them and get the results back.
Setting up Hadoop and getting started
Read here how to set up Hadoop.
The standalone mode
Does not really use the whole distribution thing but is easy to set up and also easy to debug. Once it works it's easy to switch. So just set a an Java Eclipse project, add some jars from the Hadoop installation and copy the Hadoop example from below into a class. Start the main method and it should start right away. The example from below will expect two parameters, the input folder and the not yet existing output folder.
Pseudo-Distributed Operation
Already with the Hadoop Distributed Filesystem (HDFS) but everything runs on one computer, is not yet distributed. Within the Hadoop installation folder you will find an etc folder. Required changes
Download the Hadoop Binary, extract the tar, put the folder somewhere, add its bin and sbin folder to your path (for convenience). Set also JAVA_HOME to your Java Installation. Test that it works
Some minimum configuration files
etc/hadoop/core-site.xml:
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
etc/hadoop/hdfs-site.xml:
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
etc/hadoop/mapred-site.xml:
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
etc/hadoop/yarn-site.xml:
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
Ready to initialise the system
Check that the web interface for the Hadoop NameNode works
Start it
Check if the ResourceManager works
Create folders on the HDFS
bin/hdfs dfs -mkdir /user/yourusername
Now copy the input files of your problem from you local filesystem to the HDFS
Do a ls on it to check if they are there http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/FileSystemShell.html
Once you have a jar containing a Hadoop program you can also start it on the command line:
If it dies right away, it might not find the JAVA_HOME. Try to set it manually in hadoop-env.sh
When you're done, stop the daemons with:
Hadoop Cluster Setup
- Official Hadoop Cluster Setup documentation
- Hadoop multi node cluster setup
- Another Hadoop multi node cluster setup
In our setup we will have one Hadoop master (we use the virtual machine hadoop1 for it) and serveral slaves.
Let's start to create the virtual machines, or go buy some computers :-)
vserver hadoop2 build -m debootstrap --hostname hadoop2.localdomain.tgunkel.de --netdev heimnetz --interface 192.168.178.222 --context 226 -- -d jessie
vserver hadoop3 build -m debootstrap --hostname hadoop3.localdomain.tgunkel.de --netdev heimnetz --interface 192.168.178.223 --context 227 -- -d jessie
Install Java on all of them, e.g. with Ubuntu
Go to the master computer
cd /usr/local/hadoop/
wget http://.../apache/hadoop/common/hadoop-2.X.0/hadoop-2.X.0.tar.gz
tar xzf hadoop-2.X.0.tar.gz
export JAVA_HOME="/usr/lib/jvm/java-8-oracle/jre/"
export PATH="/usr/local/hadoop/hadoop-2.7.0/bin:/usr/local/hadoop/hadoop-2.7.0/sbin:$PATH"
adduser --uid 1027 --disabled-password hadoop
chown -R hadoop:hadoop /usr/local/hadoop
su - hadoop
ssh-keygen
In the file core-site.xml provide the name of the master node
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hadoop1.localdomain.tgunkel.de:9000/</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
Now create a folder for the HDFS filesystem
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name/data
chown -R hadoop:hadoop /usr/local/hadoop/myHDFS_FilesystemFolder
In the hdfs-site.xml provide the folder and on how many nodes the files should be replicated
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.data.dir</name>
<value>/usr/local/hadoop/myHDFS_FilesystemFolder/name/data</value>
<final>true</final>
</property>
<property>
<name>dfs.name.dir</name>
<value>/usr/local/hadoop/myHDFS_FilesystemFolder/name</value>
<final>true</final>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>
Do not use raid filesystems for slaves. Instead format each disk you want to use with plain ext3 (recommend) and list them all in the hdfs-site.xml with a folder or the / folder. This is faster and more reliable than raid (because Hadoop will reduplicate data automatically if a disk fails).
<name>dfs.datanode.data.dir</name>
<value>/disk1/hdfs/data,/disk2/hdfs/data</value>
</property>
In the file mapred-site.xml also provide the name of the master
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hadoop1.localdomain.tgunkel.de:9001</value>
</property>
</configuration>
No go on all slave nodes and do this (use uid which is free on all nodes):
mkdir /usr/local/hadoop/
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name
mkdir /usr/local/hadoop/myHDFS_FilesystemFolder/name/data
chown -R hadoop:hadoop /usr/local/hadoop
su - hadoop
ssh-keygen
The master must be able to login to itself and all slaves via ssh and its key. So on the master in the .ssh folder of the hadoop user do
and copy it into the .ssh folder of all slaves
Try to login from the master to all slaves.
So far the Hadoop installation on the master does also fit for the slaves, so just copy it over
scp -r /usr/local/hadoop/hadoop-2.7.0/ hadoop3:/usr/local/hadoop/
Now go to the master The configuration file masters contains only ourself
FIXME: Does it sometimes also make sense to add the master as a slave as well?
No go to the master as the hadoop user and format the HDFS filesystem
The connection between your nodes might be different. Some nodes might be in the same rack within a datacenter, others might be in total different data centers. Should should provide a Hadoop topology script and reference to it via
The result should be /datacenter/rack and if two nodes have the same value in it Hadoop assumes they are in the same datacenter or even the same rack.
Start the filesystem
The first start may ask you to confirm all the ssh keys fingerprints.
The status page should now work
At first, none of my data nodes could connect to the master
Problem was that reverse dns of some nodes returned the same value by mistake. Until I figured out this was a workaround to start it anyway
Temporary fix, add this to hdfs-site.xml on master
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
<value>false</value>
</property>
You can also connect a node manually
Now start Hadoop
Watch out, in the default configuration there may be only one reducer which might be a performance issue for you. So reconfigure that!
Simple Hadoop example
A very typical example for understanding Hadoop is the task to count how often every distinct word appears in a very (very very) large text file. A traditional approach would be to read through the file word by word, put every word you read into a Hashmap like structure and count. With Hadoop this problem can be distributed into many nodes working in parallel.
In order to count how often every distinct word in the input appears in text file, we split the problem into a Mapping and a Reduce part. For the mapping we get a row of text, we split it into all words in that line and return the word as a key and the number 1 as a value. During the reduce we take all that keys and sum up the values for the keys. Results of several reduce runs for the same key (word) may be feed into a new reduce run.
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class WordCount {
/* expects two parameters, the input folder and the output folder (not allowed to exist already)
*
*/
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MyMapClass.class);
conf.setCombinerClass(MyReducer.class);
conf.setReducerClass(MyReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
// Map
public static class MyMapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
// 1 as an Hadoop IntWriteable
private final static IntWritable ONE = new IntWritable(1);
private Text word = new Text();
/* @see org.apache.hadoop.mapred.Mapper#map(java.lang.Object, java.lang.Object, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
* key is an offset where the data comes from (not used)
* value is one row of the input text file
* output is of the form word -> 1
*/
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
// here we can also suppress values, we ignore the word frustration here as an example
if(!"badword".equals(word.toString())) {
// here we return as a result a single word from one line as a key and as a value the number 1
// the number 1 will be replaced in the reducer with the number of occurrences of the word
output.collect(word, ONE);
}
}
}
}
// Reduce
public static class MyReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
/*
* @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
*
* Key: a single word
* Value: the number of occurrences for the key word so far. On the first calls to this method we expect there only to be lists of 1s for every occurrence of the word
* later on this method might also be called to reduce previous results again. In this case there is word -> number counted so far
*/
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
System.err.println("New mapper started for "+key);
int sum = 0;
while (values.hasNext()) {
int x=values.next().get();
if(x>1) {
System.err.println("Reduce again for key "+key.toString()+" -> "+x);
}
sum +=x;
}
// output is key -> number
output.collect(key, new IntWritable(sum));
}
}
}
Hadoop with Maven
<modelVersion>4.0.0</modelVersion>
<groupId>com.hadoopbook</groupId>
<artifactId>hadoop-book-mr-dev</artifactId>
<version>4.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>2.5.1</hadoop.version>
</properties>
<dependencies>
<!-- Hadoop main client artifact -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Unit test artifacts -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
<!-- Hadoop test artifact for running mini clusters -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>hadoop-examples</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.5</version>
<configuration>
<outputDirectory>${basedir}</outputDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
Chaining Hadoop Tasks
Instead of having more complex MapReduce jobs, try to create more and simple MapReduce jobs. One benefit of this is a re-usability.
In order to control those jobs you can start jobs one by one and wait for the previous ones to stop
// JobClient.waitForCompletion();
JobClient.runJob(conf2);
Or you use the
org.apache.hadoop.mapred.jobcontrol.JobControl
or Apache Oozie.
Unit Tests for MapReduce
Having with MRunit JUnit tests for Hadoop MapReduce solutions.
import org.junit.*;
public class MyTestClass
{
@Test
public void myTest() throws IOException, InterruptedException
{
Text testValue = new Text("123456789abcdef:42");
new MapDriver<LongWritable, Text, Text, IntWritable>()
.withMapper(new MyMapperWeJustTest())
.withInput(new LongWritable(0), testValue)
.withOutput(new Text("ExpectedKey"), new IntWritable(expectedValue))
.runTest();
}
}
If there should be no result (test for irrelevant or bad input value) just ommit the .withOutput
Tests for the Reducer work the same way
.withReducer(new MyReducerWeJustTest())
.withInput(new Text("AKey"), Arrays.asList(new IntWritable(aValue), new IntWritable(anotherValue)))
.withOutput(new Text("AKey"), new IntWritable(expectedValue))
.runTest();
Debugging and logging
Use a counter to count how often something happend
{
problem_a
}
...
System.err.println("Problem occurred!);
context.getCounter(MyCounter.problem_a).increment(1);
See also
mapreduce.reduce.log.level
Set Java options
to keep a failed task's files
Performance
Mappers should get enough data to run for some time. Only a few seconds is probably to short. There should be more than one reducer and each should run for about 5 minutes.
Combiner may improve performance because they reduce the amount of data exchange between Mapper and Reducers.
Sometimes you can even use your Reducer class for this without any change.
Map output compression
If you implement Writeable, also implement RawComparator.
Check the shuffle options.
Enable HPROF profiler
Select which IDs of mapper and reducer are profiled (you don't need to profile them all). Default is 0,1,2
mapreduce.task.profile.reduces
mapreduce.task.profile.maps
mapreduce.task.profile.reduces
mapred.child.java.opts
Hadoop Distributed Filesystem (HDFS)
http://wiki.apache.org/hadoop/DiskSetup
Master provides a status page for your HDFS
Format the HDFS like you format a normal filesystem
Create a folder
Set a limit home much space you may use
Copy files from a computer into the HDFS
AVRO
AVRO is a language neutral data serialization system.
Parquet
Parquet is a columnar storage format to efficiently store nested data.
Flume
Flume allows you to use streams as an input for Hadoop instead of fixed data like for example files.
Sqoop
Apache Sqoop allows you to use structured data stores (e.g. SQL database) as an input for Hadoop instead of e.g. files.
PIG
PIG allows you to specify MapReduce jobs with its Pig Latin language which is supposed to be easier to use than MapReduce Jobs.
HIVE
Apache Sqoop converts SQL queries into a series of MapReduce jobs for execution.
Crunch
Apache Crunch is a higher-level API for writing MAPReduce pipelines with richer data transformations and multistage pipelines.
ZooKeeper
Apache ZooKeeper Hadoop distributed coordination service
Ambari
Ambari provides an intuitive, easy-to-use Hadoop management web UI backed.