Spark
Spark is a cluster computing framework for large scale data processing which is closely integrated with Hadoop but does not use its MapReduce but has its own distributed runtime. Fast because it keeps data in memory between jobs while MapReduce usually always reloads it from disk.
Setup
Download spark binary distribution, extract it, start the interactive interface from (from the bin folder).
For the Scala interface
For the Python interface
Java Code has to be compiled
Spark uses Resilient Distributed Datasets (RDD) which are processed:
Transformations RDD -> RDD
Actions RDD -> Result
Transformations do not do anything until an action is performed on them
Spark with Maven
Create a new simple Maven app (also possible from within Eclipse)
With the new Java 1.8 style this is even easier
Add spark dependencies to the pom.xml
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>de.tgunkel.Java.Spark</groupId>
<artifactId>MyFirstSparkApplication</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>MyFirstSparkApplication</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- Get Spark and HBase -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Standard JUnit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Build von Maven, fetches also dependencies
Prepare for Eclipse integration
Import in Eclipse, needs to be Java 1.8 ready.
Add main class with some Spark Java example code.
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
public class WordCount
{
public static void main(String[] args) {
String input ="/tmp/thorsten_spark/input/";
String output="/tmp/thorsten_spark/output";
SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount"); //.setMaster("local[3]");
JavaSparkContext context = new JavaSparkContext(conf);
// read files
JavaRDD<String> textFile = context.textFile(input);
// replace text punctations
JavaRDD<String> rowNoPunctations=textFile.map(new Function<String, String>() {
public String call(String s) {
return s.replaceAll("[\.,?!"]", " ").replaceAll("\ \ *", " ");
}
});
// split each line into words
JavaRDD<String> words = rowNoPunctations.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
});
// map word -> (word, 1)
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
});
// count words
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
// put everything in one partition so we get one file
JavaPairRDD<String, Integer> result=counts.coalesce(1);
result.saveAsTextFile(output);
}
}
Build
Run generated jar with Spark
See also Spark functions overview
Or just start the Eclipse program. If it complains about missing LoggingFactory in Eclipse but not from the Maven command line. Refresh everything in Eclipse. FIXME Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
Connect Spark to a yarn
Normally Spark jobs run on a spark cluster, using the HDFS but not the yarn cluster nodes. However, you may force Spark to submit them to yarn. Spark on yarn
Be sure that you can submit Hadoop jobs into the yarn cluster before you continue. If that fails or has problems, Spark will also have problems. I had this problem at the end of every submitted Hadoop job (after they succeeded) [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server [main] INFO org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:10020. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)
Here the problem was a not started history server on the Hadoop server:
Once submitting via Hadoop works set this to the con folder of Hadoop:
Put the Hadoop and Spark bin folder in your path. Inside the Hadoop installation folder change the core-site.xml to point to your hdfs server
<property>
<name>fs.defaultFS</name>
<value>hdfs://hdfs.internal.example.com:9000</value>
</property>
Try that the HDFS server works
Put a folder with test files on it
Next configure in your local Hadoop installation your Hadoop cluster in yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>myhadoop.example.com</value>
<description>The hostname of the Resource Manager.
</description>
</property>
Now on the client computer set this variable to the Hadoop configuration folder
Now go into the lib folder of your spark client installation and try on of the examples
More power options
This will upload the spark jar all the time, which might take some time. You can upload it somewhere in your HDFS and set this variable to this place
From now on all the spark-submit will use this file instead of uploading a new one all every time.
Now generate your jar for your application and you are ready to submit it
Aggregation transformations
reduceByKey() foldByKey() aggregateByKey()
IO
saveAsTextFile() saveAsHadoopFile()
Serialization
Kryo is faster than the standard Java implementation.
Broadcast Variables
Are send to all executors
Spark Java examples
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
Get data
Read files
Generate data by yourself
list1.add(new Tuple2<Integer, String>(1, "Ape"));
list1.add(new Tuple2<Integer, String>(2, "Lion"));
list1.add(new Tuple2<Integer, String>(4, "Tiger"));
final JavaPairRDD<Integer, String> x1 = context.parallelizePairs(list1);
Enforce distribution of data, this is hopefully already the case normally
str2nr.partitionBy(partitionByHash);
final int x=str2nr.partitions().size(); // may be smaller than 5
str2nr.repartition(6);
Consume an rdd more than once
Get rid of data
Filter out all entries where the value is null
Expects 2 key -> value structures, merges them into key -> ( (list of values first structure), (list of values second structure) ), grouwith is another name for this
Get some data from the data set
First element
The first 10 elements
The first 10 elements when sorted, also provide a comarator
Get the smallest element according to comperator (here shortest value)
Take get some random elements, the result may contain duplicate entries
Get some random element, the results will be distinct
All the keys
All the value
Sort rdd
Map the data in a data into something
Map String into Key -> Value. Standard for reading a file line by line and getting key -> value pairs out of it for further processing
final String[] sp=s.split(" ");
final String key=sp[0];
final String value=sp[1];
return new Tuple2<String, String>(key, value);
});
Map from key -> value into another key -> value. Here map from a String -> String into a String -> Integer by replacing the value with the length of it
One (key, value) mapped into none, one, or several values
One (key, value) mapped into none, one, or several key / value pairs
Map a value to a list of pairs of something else with a class
rdd.map(new TypeA()).flatMap(x -> x).mapToPair(x -> x);
// does work but not nice
rdd.map(new TypeA()).flatMapToPair(x -> x);
// does NOT work but would be nice
rdd.flatMapToPair(new TypeA());
// does NOT work but not required
rdd.map(new TypeB());
// does work and this is cool
rdd.flatMapToPair(new TypeB());
public class TypeA implements org.apache.spark.api.java.function.Function<MyData, Iterator<Tuple2<String, Integer>>> {
@Override
public Iterator<Tuple2<String, Integer>> call(final MyData myData) {
...
}
}
public class TypeB implements org.apache.spark.api.java.function.PairFlatMapFunction<MyData, String, Integer> {
@Override
public Iterator<Tuple2<String, Integer>> call(final MyData myData) {
...
}
}
The class will be created for every partition on the worker. If that is expensive an alternative would be to use mapPartitions where the class would be only created once per worker or use broadcast variables to send the expensive stuff around.
Reduce the data somehow
Do not expect that enum are usable across different JVMs!
Reduce to one row per key by merging the values of two entries with same key together. The result must be of same type as the value
groupByKey works like reduceByKey but is less efficient because it first moves all the data for one key together before it merges the value while the reduceByKey can be run against the data before all the data for one key has been collected, so less data has to be transferred when your reduce function makes the data smaller http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
Key -> Container with all values for that key
Reduce to one row per key be merging values together, the values can have a different type after the merging
url -> url.length(), // how to transform one value into the result type
(l, url) -> l+url.length(), // how to add a new value to an existing result for that key
(l1, l2) -> l1+l2 // how to merge two sub results for a key together
);
Reduce to one row per key be merging values together, the values can have a different type after the merging
url -> new ArrayList<String>(Arrays.asList(url)), // how to transform one value into the result type
(list, url) -> { list.add(url); return list; }, // how to add a new value to an existing result for that key
(list1, list2) -> { list1.addAll(list2); return list1; } // how to merge two sub results for a key together
);
Reduce to one row per key be merging values together, the values can have a different type after the merging. Uses combineByKey in the background
0, // the start value to take
(l,url) -> l+url.length(), // value so far and new element, get result
(l1,l2) -> l1+l2 // two results, merge them
);
???
Gets to values for the same key, merges them be applying the function the values. Expects a neutral element you can merge with any value without a change
???
Get information about the total amount of data
Count
(key, value) -> count
key -> Number (is actually Long, not Object)
Enforce transformation into traditional containers
Enforces the data to be written to a traditional map with key -> value
Enforces the data to be written to a traditional list containing key -> value entries
Output
Enforce everything to by in one partition so when you write it into a get one file (no longer distributed, avoid)
Save current data set into files (distributed)
Use RDD more than once
rdd1.count();
rdd2.reduce((a,b)->a+b);