Spark

Spark is a cluster computing framework for large scale data processing which is closely integrated with Hadoop but does not use its MapReduce but has its own distributed runtime. Fast because it keeps data in memory between jobs while MapReduce usually always reloads it from disk.

Setup

Download spark binary distribution, extract it, start the interactive interface from (from the bin folder).

For the Scala interface

./spark-shell

For the Python interface

./pyspark

Java Code has to be compiled

Spark uses Resilient Distributed Datasets (RDD) which are processed:

Transformations RDD -> RDD

Actions RDD -> Result

Transformations do not do anything until an action is performed on them

Spark with Maven

Spark with Maven

Create a new simple Maven app (also possible from within Eclipse)

mvn archetype:generate -DgroupId=de.tgunkel.Java.Spark  -DartifactId=MyFirstSparkApplication

With the new Java 1.8 style this is even easier

final JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a,b) ->   a + b );

Add spark dependencies to the pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>de.tgunkel.Java.Spark</groupId>
  <artifactId>MyFirstSparkApplication</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>MyFirstSparkApplication</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>

    <!-- Get Spark and HBase -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.3.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>1.0.0</version>
    </dependency>

    <!-- Standard JUnit -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>

  </dependencies>

</project>

Build von Maven, fetches also dependencies

mvn package

Prepare for Eclipse integration

mvn eclipse:eclipse

Import in Eclipse, needs to be Java 1.8 ready.

Add main class with some Spark Java example code.

package de.tgunkel.Java.Spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

import java.util.Arrays;

public class WordCount
{
  public static void main(String[] args) {
    String input ="/tmp/thorsten_spark/input/";
    String output="/tmp/thorsten_spark/output";


    SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount"); //.setMaster("local[3]");
    JavaSparkContext context = new JavaSparkContext(conf);

    // read files
    JavaRDD<String> textFile = context.textFile(input);

    // replace text punctations
    JavaRDD<String> rowNoPunctations=textFile.map(new Function<String, String>() {
    public String call(String s) {
    return s.replaceAll("[\.,?!"]", " ").replaceAll("\ \ *", " ");
    }
  });


    // split each line into words
    JavaRDD<String> words = rowNoPunctations.flatMap(new FlatMapFunction<String, String>() {
      public Iterable<String> call(String s) { return Arrays.asList(s.split("
")); }
    });


    // map   word -> (word, 1)
    JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
      public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
    });


    // count words
    JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
      public Integer call(Integer a, Integer b) { return a + b; }
    });

    // put everything in one partition so we get one file
    JavaPairRDD<String, Integer> result=counts.coalesce(1);


    result.saveAsTextFile(output);


 }
}

Build

mvn package

Run generated jar with Spark

/usr/local/spark-1.3.1-bin-hadoop2.6/bin/spark-submit --class  de.tgunkel.Java.Spark.WordCount --master local[2] target/MyFirstSparkApplication-1.0-SNAPSHOT.jar   PARAMETERS_FOR_YOUR_PROGRAMM

See also Spark functions overview

Or just start the Eclipse program. If it complains about missing LoggingFactory in Eclipse but not from the Maven command line. Refresh everything in Eclipse. FIXME Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration

Connect Spark to a yarn

Normally Spark jobs run on a spark cluster, using the HDFS but not the yarn cluster nodes. However, you may force Spark to submit them to yarn. Spark on yarn

Be sure that you can submit Hadoop jobs into the yarn cluster before you continue. If that fails or has problems, Spark will also have problems. I had this problem at the end of every submitted Hadoop job (after they succeeded) [main] INFO org.apache.hadoop.mapred.ClientServiceDelegate - Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server [main] INFO org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:10020. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS)

Here the problem was a not started history server on the Hadoop server:

mr-jobhistory-daemon.sh start historyserver

Once submitting via Hadoop works set this to the con folder of Hadoop:

export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop/

Put the Hadoop and Spark bin folder in your path. Inside the Hadoop installation folder change the core-site.xml to point to your hdfs server

<!-- configure this to connect to HDFS cluster -->
<property>
  <name>fs.defaultFS</name>
  <value>hdfs://hdfs.internal.example.com:9000</value>
</property>

Try that the HDFS server works

hdfs dfs -ls

Put a folder with test files on it

hdfs dfs -put test_thorsten/ /

Next configure in your local Hadoop installation your Hadoop cluster in yarn-site.xml

<!-- configure this to connect to yarn cluster -->
<property>
 <name>yarn.resourcemanager.hostname</name>
 <value>myhadoop.example.com</value>
 <description>The hostname of the Resource Manager.
 </description>
</property>

Now on the client computer set this variable to the Hadoop configuration folder

export HADOOP_CONF_DIR="/usr/local/hadoop/etc/hadoop/"

Now go into the lib folder of your spark client installation and try on of the examples

spark-submit --class org.apache.spark.examples.JavaTC --master yarn-cluster spark-examples-1.3.1-hadoop2.6.0.jar

More power options

--driver-memory 2G --num-executors 5 --executor-cores 3

This will upload the spark jar all the time, which might take some time. You can upload it somewhere in your HDFS and set this variable to this place

export SPARK_JAR=hdfs://hdfs.example.com:9000/lib/spark-assembly-1.3.1-hadoop2.6.0.jar

From now on all the spark-submit will use this file instead of uploading a new one all every time.

Now generate your jar for your application and you are ready to submit it

spark-submit --class foo.bar.WordCount --master yarn-cluster ./target/SparkMaven-0.0.1-SNAPSHOT.jar

Aggregation transformations

reduceByKey() foldByKey() aggregateByKey()

IO

saveAsTextFile() saveAsHadoopFile()

Serialization

Kryo is faster than the standard Java implementation.

Broadcast Variables

Are send to all executors

Spark Java examples

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

Get data

Read files

final JavaRDD<String> textFile = context.textFile(input);

Generate data by yourself

final List<Tuple2<Integer, String>> list1=new ArrayList<Tuple2<Integer, String>>();
list1.add(new Tuple2<Integer, String>(1, "Ape"));
list1.add(new Tuple2<Integer, String>(2, "Lion"));
list1.add(new Tuple2<Integer, String>(4, "Tiger"));
final JavaPairRDD<Integer, String> x1 = context.parallelizePairs(list1);

Enforce distribution of data, this is hopefully already the case normally

final Partitioner partitionByHash=new HashPartitioner(5);
str2nr.partitionBy(partitionByHash);
final int x=str2nr.partitions().size(); // may be smaller than 5
str2nr.repartition(6);

Consume an rdd more than once

copy = data.persist(StorageLevel.MEMORY_AND_DISK());

Get rid of data

Filter out all entries where the value is null

final JavaPairRDD<String, String> keyValueWithURL = keyValue2.filter ( x-> x._2!=null );

Expects 2 key -> value structures, merges them into key -> ( (list of values first structure), (list of values second structure) ), grouwith is another name for this

final JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<String>>> gg = x1.cogroup(x2);

Get some data from the data set

First element

final Tuple2<String, String> fs=smallExample.first();

The first 10 elements

final List<Tuple2<String, Integer>> kk=str2nr.take(10);

The first 10 elements when sorted, also provide a comarator

final List<Tuple2<String, Integer>> kk2 = str2nr.takeOrdered(10, (a,b) -> Integer.compare(a._2, b._2) );

Get the smallest element according to comperator (here shortest value)

final Tuple2<String, String> mm=smallExample.min( (t1, t2) -> Integer.compare(t1._2.length(), t2._2.length()) );

Take get some random elements, the result may contain duplicate entries

List<Tuple2<Integer, String>> ff = x1.takeSample(false, 2);

Get some random element, the results will be distinct

List<Tuple2<Integer, String>> ff = x1.takeSample(true, 2);

All the keys

final JavaRDD<String> ks=smallExample.keys();

All the value

final JavaRDD<String> vs=smallExample.values();

Sort rdd

rdd.sortBy(person -> person.getAge(), false, 1);

Map the data in a data into something

Map String into Key -> Value. Standard for reading a file line by line and getting key -> value pairs out of it for further processing

final JavaPairRDD<String, String> keyValue2=textFile.mapToPair(s-> {
final String[] sp=s.split(" ");
final String key=sp[0];
final String value=sp[1];
return new Tuple2<String, String>(key, value);
});

Map from key -> value into another key -> value. Here map from a String -> String into a String -> Integer by replacing the value with the length of it

final JavaPairRDD<String, Integer> str2nr = smallExample.mapToPair(t -> new Tuple2<String, Integer>(t._1, t._2.length() ) );

One (key, value) mapped into none, one, or several values

final JavaRDD<String> splitted=smallExample.flatMap(t -> Arrays.asList(t._2.split("/")) );

One (key, value) mapped into none, one, or several key / value pairs

smallExample.flatMapToPair(t -> List<Tuple2<>> ....);

Map a value to a list of pairs of something else with a class

// does work but ugly
rdd.map(new TypeA()).flatMap(x -> x).mapToPair(x -> x);

// does work but not nice
rdd.map(new TypeA()).flatMapToPair(x -> x);

// does NOT work but would be nice
rdd.flatMapToPair(new TypeA());

// does NOT work but not required
rdd.map(new TypeB());

// does work and this is cool
rdd.flatMapToPair(new TypeB());

public class TypeA implements org.apache.spark.api.java.function.Function<MyData, Iterator<Tuple2<String, Integer>>> {

    @Override
    public Iterator<Tuple2<String, Integer>> call(final MyData myData) {
        ...
    }
}

public class TypeB implements org.apache.spark.api.java.function.PairFlatMapFunction<MyData, String, Integer> {

    @Override
    public Iterator<Tuple2<String, Integer>> call(final MyData myData) {
        ...
    }

}

The class will be created for every partition on the worker. If that is expensive an alternative would be to use mapPartitions where the class would be only created once per worker or use broadcast variables to send the expensive stuff around.

Reduce the data somehow

Do not expect that enum are usable across different JVMs!

Reduce to one row per key by merging the values of two entries with same key together. The result must be of same type as the value

final JavaPairRDD<String, String> uniqkey = smallExample.reduceByKey( (url1, url2) -> url1+" "+url2 );

groupByKey works like reduceByKey but is less efficient because it first moves all the data for one key together before it merges the value while the reduceByKey can be run against the data before all the data for one key has been collected, so less data has to be transferred when your reduce function makes the data smaller http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

Key -> Container with all values for that key

final JavaPairRDD<String, Iterable<String>> uniqueKey5=smallExample.groupByKey();

Reduce to one row per key be merging values together, the values can have a different type after the merging

final JavaPairRDD<String, Integer> uniqueKey2 = smallExample.combineByKey(
url -> url.length(),   // how to transform one value into the result type
(l, url) -> l+url.length(),        // how to add a new value to an existing result for that key
(l1, l2) -> l1+l2 // how to merge two sub results for a key together
);

Reduce to one row per key be merging values together, the values can have a different type after the merging

final JavaPairRDD<String, List<String> > uniqueKey3 = smallExample.combineByKey(
url            -> new ArrayList<String>(Arrays.asList(url)), // how to transform one value into the result type
(list, url)    -> { list.add(url); return list; },           // how to add a new value to an existing result for that key
(list1, list2) -> { list1.addAll(list2); return list1; }     // how to merge two sub results for a key together
);

Reduce to one row per key be merging values together, the values can have a different type after the merging. Uses combineByKey in the background

final JavaPairRDD<String, Integer > uniqueKey4=smallExample.aggregateByKey(
0,                          // the start value to take
(l,url) -> l+url.length(),  // value so far and new element, get result
(l1,l2) -> l1+l2            // two results, merge them
);

???

Integer tt = smallExample.aggregate(0, (l,  t) -> { final String key=t._1; final String url=t._2; return l+url.length(); }, (l1,l2) -> l1+l2);

Gets to values for the same key, merges them be applying the function the values. Expects a neutral element you can merge with any value without a change

JavaPairRDD<String, Integer> aa = str2nr.foldByKey(0, (a,b) -> a+b );

???

final Tuple2<String, Integer> uu = str2nr.fold( new Tuple2<String, Integer>("", 0), (t1, t2) -> new Tuple2<String, Integer>(t1._1, t1._2+t2._2) );

Get information about the total amount of data

Count

final long cnt=smallExample.count();

(key, value) -> count

final Map<Tuple2<String, String>, Long> ff=smallExample.countByValue();

key -> Number (is actually Long, not Object)

final Map<String, Object> res=smallExample.countByKey();

Enforce transformation into traditional containers

Enforces the data to be written to a traditional map with key -> value

final Map<Integer, String> zz = x1.collectAsMap();

Enforces the data to be written to a traditional list containing key -> value entries

final List<Tuple2<Integer, String>> z = x1.collect();

Output

Enforce everything to by in one partition so when you write it into a get one file (no longer distributed, avoid)

final JavaPairRDD<Integer, Tuple2<String, String>> result=j.coalesce(1);

Save current data set into files (distributed)

result.saveAsTextFile(result);

Use RDD more than once

rdd2=rdd1.cache();
rdd1.count();
rdd2.reduce((a,b)->a+b);