Java Multithreading

Ways to start Code in parallel / in the background

Thread class

Extend the class Thread and implement the run method

public class Foo extends Thread
{
 @Override
 public void run()
 {
  // ...
 }
}

Create object(s) from your class and call the method start() - not the method run() you implemented.

final int nrOfThreads=4;
Foo [] f=new Foo[nrOfThreads];
for(int i=0; i<nrOfThreads; i++)
{
 f[i]=new Foo();
 f[i].start();
}

Interface Runnable

If you want to extend another class anyhow you can also implement the Interface Runnable instead

public class Bar implements Runnable
{
 public void run()
 {
 // ...
 }
}

Otherwise works like Thread above

final int nrOfThreads=4;
Thread [] f=new Thread[nrOfThreads];
for(int i=0; i<nrOfThreads; i++)
{
 f[i]=new Thread(new Bar());
 f[i].start();
}

ThreadPoolExecutor

If you want to have x things to do and always want to have y<x threads run in parallel this is one way

// number of CPUs in your computer
int cpus=Runtime.getRuntime().availableProcessors();

// what to run
Runnable r1, r2;

// run Runnables
ExecutorService executorService = Executors.newFixedThreadPool(cpus);
...
executorService.execute(r1);
executorService.execute(r2);
...
executorService.shutdown();
executorService.awaitTermination(15, TimeUnit.SECONDS);

This is very cool, but watch out:

  • If you add threads faster than they complete they will queue up and consume resources
  • How long do you need to wait for all threads to finish when you might have created a huge queue. Check ThreadPoolExecutor getQueue().size()

Oft you also want to get a result back from you thread. Just implement Callable instead of Runable.

public class MyTask implements Callable< Integer >
{
    @Override
    public Integer call() throws Exception
    {
      Integer result;
       ...
        return result;
    }
}

Submit a new Task, you get a more a less empty Future object back and the result is calculated later.

Future<Integer> myResult= executorService.submit( new MyTask() );

Test if the result is done (does not wait if not)

myResult.isDone();

Get the result (wait if not yet there)

myResult.get();

If you have all the threads you will need already you can also submit all of them and wait for all of them to finish.

List< Callable< TYPE > > myTasks;
results=executorService.invokeAll(myTasks);

If you would like to have one Thread doing some work in the background, this will call after 60 seconds your method, once it finished wait for 15 seconds and start it again

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(() -> process(), 60, 15, TimeUnit.SECONDS);

private void process() {
  ...
}

ForkJoinPool

Since Java 7 there is a new Framework for getting tasks exectued. There is not only a queue for tasks but each task has also a queue for subtasks. Once a tasks is done with its queue it can steal from the other tasks. Only if tasks with subtasks are done the main queue is processed. Good for problems

  • where the tasks differ in complexity
  • when the result of a task depends on the results of its subtasks. Tasks waiting for subtasks do not count as running tasks so it is easier to maintain a stable load of running tasks

Example, find numbers that are prime

import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

// create a pool of 4 parallel tasks
final int nrOfParallelTasks=4;
ForkJoinPool pool= new ForkJoinPool(nrOfParallelTasks);

// add some tasks to that pool
ForkJoinTask<Set<Integer>> task_A = new PrimeCalculator(1,   100000);
pool.invoke(task_A);

ForkJoinTask<Set<Integer>> task_B = new PrimeCalculator(100000,  200000);        
pool.invoke(task_B);

// collect the results of our tasks
Set<Integer> allPrimes=new HashSet<Integer>();
allPrimes.addAll(task_A.getRawResult());
allPrimes.addAll(task_B.getRawResult());

And this would be the task (method that checks for prime is not provided)

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.RecursiveTask;

public class PrimeCalculator extends RecursiveTask<Set<Integer> > {
    private final int min;
    private final int max;

    public PrimeCalculator(int pMin, int pMax)
    {
        this.min=pMin;
        this.max=pMax;
    }

    /* (non-Javadoc)
     * @see java.util.concurrent.RecursiveTask#compute()
     */

    @Override
    protected Set<Integer> compute()
    {        
        Set<Integer> result=new HashSet<Integer>();

        // decide if our task should be split in two smaller tasks
        if(notTooBigForOneTask())
        {
            // do the calculation
            System.out.println("Calculate "+this.min+ " to "+this.max);
            int currentNumber=this.min;
            while(currentNumber<=this.max)
            {
                if(isPrime(currentNumber))
                    result.add(currentNumber);

                currentNumber++;
            }            
        }
        else
        {
            // split the task into two subtasks
            int splitPos=splitPosition();

            System.out.println("Split task "+this.min+"-"+this.max+" into two subtasks "+this.min+"-"+splitPos+" and "+splitPos+"-"+this.max);

            PrimeCalculator a=new PrimeCalculator(this.min,   splitPos);
            PrimeCalculator b=new PrimeCalculator(splitPos+1, this.max);
            // add the new tasks to the local queue of this taks
            invokeAll(a, b);
            // collect their results
            result.addAll(a.getRawResult());
            result.addAll(b.getRawResult());
        }
        return result;
    }
}

Links ForkJoinPool

Wait for the end of a thread

try
{
 for(int i=0; i<nrOfThreads; i++) f[i].join();
}
catch (InterruptedException e) {};

Normalerweise läuft ein Programm so lange weiter, bis auch der letzte Thread sich beendet hat. Ruft man auf einem Thread die Methode

setDaemon(true);

auf, wird auf das Ende dieses Threads nicht mehr gewartet.

Threads von außen beenden

Um einen Thread von außen beenden zu können, ist es hilfreich, wenn dieser auf

!isInterrupted()

testet und

InterruptedException

Exceptions fängt und dann selbst

interrrupt()

aufruft. Z.B.

while (!isInterrupted())
 {
   try
   {

...

   }
   catch ( InterruptedException e )
   {
    interrupt();
   }
 }

Jetzt kann man den Thread von außen mit

interrupt()

beenden.

Hat man Runnable implementiert, kann man so testen, ob man unterbrochen wurde

while(!Thread.currentThread().isInterrupted())
{
    try
    {
     ...
    }
    catch (InterruptedException e)
    {
     ...
     Thread.currentThread().interrupt();
    }
}

Synchronized

Methode darf nur von einem Thread gleichzeitig betreten werden

public class Foo
{
 synchronized public static void bar()
 {
  // ...
 }
}

Pro Objekt darf Methode nur von einem Thread gleichzeitig betreten werden

public class Foo
{
 synchronized public void bar()
 {
  // ...
 }
}

Entspricht

public class Foo
{
 public void bar()
 {
  synchronized(this)
  {
   // ...
  }
 }
}

Alternative:

Lock l = ...;
l.lock();
try
{ ...}
finally
{
  l.unlock();
}

ThreadLocal

Variablen auf die nur der entsprechenden Thread Zugriff hat (auf private Attribute hätte alle Objekte der entsprechenden Klasse Zugriff).

ThreadLocal foo = new ThreadLocal<Integer>();
foo.set(42);
Integer i=foo.get();

Damit man nicht für jeden Thread auch einen Start Wert ablegen muss kann man eine statische Helper Methode anlegen

private static final ThreadLocal<SimpleFormatter> perThreadFormatter = new ThreadLocal<SimpleFormatter>() {
        @Override
        protected SimpleFormatter initialValue() {
                return new SimpleFormatter(4, true);
        }
};

oder sowas machen

ThreadLocal<Kryo> kryos = ThreadLocal.withInitial(Kryo::new);
...
final Kryo kryo = kryos.get();

Exceptions in Threads

Angenommen man hat 4 Threads plus den Thread in dem main() läuft:

public static void main(String[] args)
{
 Thread t1,t2,t3,t4;
 ...
 t1.start();
 t2.start();
 t3.start();
 t4.start();

 while(true)
 {
   foo();                      
 }
}

Eine nicht gefangene Exception in einem der Threads bricht nur genau diesen ab, alle anderen laufen ganz normal weiter. Selbst wenn der Thread in dem main läuft abbricht, laufen t1 bis t4 weiter.

Möchte man ein anderes Verhalten erreichen muss man manuell eingreifen. Z.B. kann man einen UncaughtExceptionHandler in der Klasse Thread eintragen:

Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());

Die Klasse muss die Methode

void uncaughtException(Thread t, Throwable e)

überschreiben, die bei jedem Thread aufgerufen wird, der durch eine unbehandelte Exception vor der Beendigung steht. In dieser Methode kann er dann auch andere Threads zum Beenden auffordern.

public class MyUncaughtExceptionHandler implements UncaughtExceptionHandler
{
        private HashMap<Thread, Set<Thread>> connectedThreads;

...            
        @Override
        public void uncaughtException(Thread t, Throwable e)
        {
                for(Thread connectedThread : connectedThreads.get(t))
                {
                        System.out.println("Thread "+t+" had an uncaught exception, stop also the connected thread "+connectedThread);
                        connectedThread.interrupt();
                }
        }
}

Im Debugger funktioniert das möglicherweise nicht wie erwartet.

Pause and Resume Threads

ReentrantLock

Only one thread can pass lock.lock(), all other threads wait there until lock.unluck() is called. The same thread however can pass lock.lock() multiple times.

ReentrantLock lock = new ReentrantLock();
int count = 0;

void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
}

Semaphore

Mit einer Semaphore kann man die Anzahl der Thread für eine Resource beschränken. Mit

semaphore.acquire();

kann eine Thread in den kritischen Bereich eintreten, ist der schon zu voll wird automatisch gewartet. Und mit

semaphore.release();

kann er sie beim Verlassen wieder freigeben und einer der Wartenden kann dafür eintreten.

Die Semaphore wird zentral erzeugt und dann allen interessierten Threads zugewiesen. Beim Erzeugen kann man noch festlegen, ob sie fair ist, d.h. der, der am längsten wartet, darf als erstes rein. Fair sein ist hier etwas langsamer, dafür kann es aber auch nicht passieren, dass ein Thread in der sehr Warteposition sehr lange hängen bleibt.

boolean fair=true;
Semaphore s=new Semaphore(3, fair);

MyTask t1=new MyTask(s);
MyTask t2=new MyTask(s);
...
 
import java.util.concurrent.Semaphore;

public class MyTask implements Runnable
{
    private Semaphore semaphore;

    public MyTask(Semaphore pSemaphore)
    {
       this.semaphore=pSemaphore;
    }

    @Override
    public void run()
    {
        try
        {
            this.semaphore.acquire();

            // do the work

            this.semaphore.release();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }

}

CountDownLatch

Die Klasse java.util.concurrent.CountDownLatch kann benutzt werden, um mehrere Threads zu koordinieren. Ein CountDownLatch hat einen Startwert, kann um 1 heruntergezählt werden und wenn 0 erreicht wird, kann eine Aktion ausgelöst werden.

Das ist hier ist ein normaler Thread, der zwei CountDownLatch Objekte beinhaltet. Über das eine Objekt teilen wir dem Thread mit, wann er starten darf, über das andere teilt er uns mit, wann er fertig ist.

import java.util.concurrent.CountDownLatch;

public class ThreadLatched extends Thread
{
    private final CountDownLatch startLatch;
    private final CountDownLatch stopLatch;

    /**
     * Constructor
     *
     * @param startLatch latch to signal the start
     * @param stopLatch  latch to signal the end
     */

    public ThreadLatched(CountDownLatch startLatch, CountDownLatch stopLatch)
    {
        this.startLatch = startLatch;
        this.stopLatch  = stopLatch;
    }

    /* (non-Javadoc)
     * @see java.lang.Thread#run()
     */

    public void run()
    {
      try
      {
          // wait for the startLatch to reach 0 to start
          startLatch.await();

          ...
      }
      catch (InterruptedException iex)
      {
       ...          
      }
      finally
      {
          // reduce the stopLatch by one to indicate that we are done
          stopLatch.countDown();
      }
    }
}

Und so kann man die CountDownLatch Objekte dann benutzen

final int nrOfThreads=5;

// this starts at 1, is passed to all threads and once it hits 0, everything starts
CountDownLatch startLatch = new CountDownLatch(1);

// this starts at number of threads, is passed to all threads, every thread that finishes reduces it by one, once it reaches 0 all threads are done
CountDownLatch stopLatch  = new CountDownLatch(nrOfThreads);

// init threads        
for(int i=1; i<=nrOfThreads; i++)
{
    Thread t = new ThreadLatched(startLatch, stopLatch);
     t.start();    
}

// this gives the start signal for all threads
startLatch.countDown();

// this waits for all threads to be finished
stopLatch.await();

Phaser

FIXME