Java Multithreading
Ways to start Code in parallel / in the background
Thread class
Extend the class Thread and implement the run method
{
@Override
public void run()
{
// ...
}
}
Create object(s) from your class and call the method start() - not the method run() you implemented.
Foo [] f=new Foo[nrOfThreads];
for(int i=0; i<nrOfThreads; i++)
{
f[i]=new Foo();
f[i].start();
}
Interface Runnable
If you want to extend another class anyhow you can also implement the Interface Runnable instead
{
public void run()
{
// ...
}
}
Otherwise works like Thread above
Thread [] f=new Thread[nrOfThreads];
for(int i=0; i<nrOfThreads; i++)
{
f[i]=new Thread(new Bar());
f[i].start();
}
ThreadPoolExecutor
If you want to have x things to do and always want to have y<x threads run in parallel this is one way
int cpus=Runtime.getRuntime().availableProcessors();
// what to run
Runnable r1, r2;
// run Runnables
ExecutorService executorService = Executors.newFixedThreadPool(cpus);
...
executorService.execute(r1);
executorService.execute(r2);
...
executorService.shutdown();
executorService.awaitTermination(15, TimeUnit.SECONDS);
This is very cool, but watch out:
- If you add threads faster than they complete they will queue up and consume resources
- How long do you need to wait for all threads to finish when you might have created a huge queue. Check ThreadPoolExecutor getQueue().size()
Oft you also want to get a result back from you thread. Just implement Callable instead of Runable.
{
@Override
public Integer call() throws Exception
{
Integer result;
...
return result;
}
}
Submit a new Task, you get a more a less empty Future object back and the result is calculated later.
Test if the result is done (does not wait if not)
Get the result (wait if not yet there)
If you have all the threads you will need already you can also submit all of them and wait for all of them to finish.
results=executorService.invokeAll(myTasks);
If you would like to have one Thread doing some work in the background, this will call after 60 seconds your method, once it finished wait for 15 seconds and start it again
executor.scheduleAtFixedRate(() -> process(), 60, 15, TimeUnit.SECONDS);
private void process() {
...
}
ForkJoinPool
Since Java 7 there is a new Framework for getting tasks exectued. There is not only a queue for tasks but each task has also a queue for subtasks. Once a tasks is done with its queue it can steal from the other tasks. Only if tasks with subtasks are done the main queue is processed. Good for problems
- where the tasks differ in complexity
- when the result of a task depends on the results of its subtasks. Tasks waiting for subtasks do not count as running tasks so it is easier to maintain a stable load of running tasks
Example, find numbers that are prime
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
// create a pool of 4 parallel tasks
final int nrOfParallelTasks=4;
ForkJoinPool pool= new ForkJoinPool(nrOfParallelTasks);
// add some tasks to that pool
ForkJoinTask<Set<Integer>> task_A = new PrimeCalculator(1, 100000);
pool.invoke(task_A);
ForkJoinTask<Set<Integer>> task_B = new PrimeCalculator(100000, 200000);
pool.invoke(task_B);
// collect the results of our tasks
Set<Integer> allPrimes=new HashSet<Integer>();
allPrimes.addAll(task_A.getRawResult());
allPrimes.addAll(task_B.getRawResult());
And this would be the task (method that checks for prime is not provided)
import java.util.Set;
import java.util.concurrent.RecursiveTask;
public class PrimeCalculator extends RecursiveTask<Set<Integer> > {
private final int min;
private final int max;
public PrimeCalculator(int pMin, int pMax)
{
this.min=pMin;
this.max=pMax;
}
/* (non-Javadoc)
* @see java.util.concurrent.RecursiveTask#compute()
*/
@Override
protected Set<Integer> compute()
{
Set<Integer> result=new HashSet<Integer>();
// decide if our task should be split in two smaller tasks
if(notTooBigForOneTask())
{
// do the calculation
System.out.println("Calculate "+this.min+ " to "+this.max);
int currentNumber=this.min;
while(currentNumber<=this.max)
{
if(isPrime(currentNumber))
result.add(currentNumber);
currentNumber++;
}
}
else
{
// split the task into two subtasks
int splitPos=splitPosition();
System.out.println("Split task "+this.min+"-"+this.max+" into two subtasks "+this.min+"-"+splitPos+" and "+splitPos+"-"+this.max);
PrimeCalculator a=new PrimeCalculator(this.min, splitPos);
PrimeCalculator b=new PrimeCalculator(splitPos+1, this.max);
// add the new tasks to the local queue of this taks
invokeAll(a, b);
// collect their results
result.addAll(a.getRawResult());
result.addAll(b.getRawResult());
}
return result;
}
}
Links ForkJoinPool
Wait for the end of a thread
{
for(int i=0; i<nrOfThreads; i++) f[i].join();
}
catch (InterruptedException e) {};
Normalerweise läuft ein Programm so lange weiter, bis auch der letzte Thread sich beendet hat. Ruft man auf einem Thread die Methode
auf, wird auf das Ende dieses Threads nicht mehr gewartet.
Threads von außen beenden
Um einen Thread von außen beenden zu können, ist es hilfreich, wenn dieser auf
testet und
Exceptions fängt und dann selbst
aufruft. Z.B.
{
try
{
...
}
catch ( InterruptedException e )
{
interrupt();
}
}
Jetzt kann man den Thread von außen mit
beenden.
Hat man Runnable implementiert, kann man so testen, ob man unterbrochen wurde
{
try
{
...
}
catch (InterruptedException e)
{
...
Thread.currentThread().interrupt();
}
}
Synchronized
Methode darf nur von einem Thread gleichzeitig betreten werden
{
synchronized public static void bar()
{
// ...
}
}
Pro Objekt darf Methode nur von einem Thread gleichzeitig betreten werden
{
synchronized public void bar()
{
// ...
}
}
Entspricht
{
public void bar()
{
synchronized(this)
{
// ...
}
}
}
Alternative:
l.lock();
try
{ ...}
finally
{
l.unlock();
}
ThreadLocal
Variablen auf die nur der entsprechenden Thread Zugriff hat (auf private Attribute hätte alle Objekte der entsprechenden Klasse Zugriff).
foo.set(42);
Integer i=foo.get();
Damit man nicht für jeden Thread auch einen Start Wert ablegen muss kann man eine statische Helper Methode anlegen
@Override
protected SimpleFormatter initialValue() {
return new SimpleFormatter(4, true);
}
};
oder sowas machen
...
final Kryo kryo = kryos.get();
Exceptions in Threads
Angenommen man hat 4 Threads plus den Thread in dem main() läuft:
{
Thread t1,t2,t3,t4;
...
t1.start();
t2.start();
t3.start();
t4.start();
while(true)
{
foo();
}
}
Eine nicht gefangene Exception in einem der Threads bricht nur genau diesen ab, alle anderen laufen ganz normal weiter. Selbst wenn der Thread in dem main läuft abbricht, laufen t1 bis t4 weiter.
Möchte man ein anderes Verhalten erreichen muss man manuell eingreifen. Z.B. kann man einen UncaughtExceptionHandler in der Klasse Thread eintragen:
Die Klasse muss die Methode
überschreiben, die bei jedem Thread aufgerufen wird, der durch eine unbehandelte Exception vor der Beendigung steht. In dieser Methode kann er dann auch andere Threads zum Beenden auffordern.
{
private HashMap<Thread, Set<Thread>> connectedThreads;
...
@Override
public void uncaughtException(Thread t, Throwable e)
{
for(Thread connectedThread : connectedThreads.get(t))
{
System.out.println("Thread "+t+" had an uncaught exception, stop also the connected thread "+connectedThread);
connectedThread.interrupt();
}
}
}
Im Debugger funktioniert das möglicherweise nicht wie erwartet.
Pause and Resume Threads
ReentrantLock
Only one thread can pass lock.lock(), all other threads wait there until lock.unluck() is called. The same thread however can pass lock.lock() multiple times.
int count = 0;
void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
Semaphore
Mit einer Semaphore kann man die Anzahl der Thread für eine Resource beschränken. Mit
kann eine Thread in den kritischen Bereich eintreten, ist der schon zu voll wird automatisch gewartet. Und mit
kann er sie beim Verlassen wieder freigeben und einer der Wartenden kann dafür eintreten.
Die Semaphore wird zentral erzeugt und dann allen interessierten Threads zugewiesen. Beim Erzeugen kann man noch festlegen, ob sie fair ist, d.h. der, der am längsten wartet, darf als erstes rein. Fair sein ist hier etwas langsamer, dafür kann es aber auch nicht passieren, dass ein Thread in der sehr Warteposition sehr lange hängen bleibt.
Semaphore s=new Semaphore(3, fair);
MyTask t1=new MyTask(s);
MyTask t2=new MyTask(s);
...
public class MyTask implements Runnable
{
private Semaphore semaphore;
public MyTask(Semaphore pSemaphore)
{
this.semaphore=pSemaphore;
}
@Override
public void run()
{
try
{
this.semaphore.acquire();
// do the work
this.semaphore.release();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
CountDownLatch
Die Klasse java.util.concurrent.CountDownLatch kann benutzt werden, um mehrere Threads zu koordinieren. Ein CountDownLatch hat einen Startwert, kann um 1 heruntergezählt werden und wenn 0 erreicht wird, kann eine Aktion ausgelöst werden.
Das ist hier ist ein normaler Thread, der zwei CountDownLatch Objekte beinhaltet. Über das eine Objekt teilen wir dem Thread mit, wann er starten darf, über das andere teilt er uns mit, wann er fertig ist.
public class ThreadLatched extends Thread
{
private final CountDownLatch startLatch;
private final CountDownLatch stopLatch;
/**
* Constructor
*
* @param startLatch latch to signal the start
* @param stopLatch latch to signal the end
*/
public ThreadLatched(CountDownLatch startLatch, CountDownLatch stopLatch)
{
this.startLatch = startLatch;
this.stopLatch = stopLatch;
}
/* (non-Javadoc)
* @see java.lang.Thread#run()
*/
public void run()
{
try
{
// wait for the startLatch to reach 0 to start
startLatch.await();
...
}
catch (InterruptedException iex)
{
...
}
finally
{
// reduce the stopLatch by one to indicate that we are done
stopLatch.countDown();
}
}
}
Und so kann man die CountDownLatch Objekte dann benutzen
// this starts at 1, is passed to all threads and once it hits 0, everything starts
CountDownLatch startLatch = new CountDownLatch(1);
// this starts at number of threads, is passed to all threads, every thread that finishes reduces it by one, once it reaches 0 all threads are done
CountDownLatch stopLatch = new CountDownLatch(nrOfThreads);
// init threads
for(int i=1; i<=nrOfThreads; i++)
{
Thread t = new ThreadLatched(startLatch, stopLatch);
t.start();
}
// this gives the start signal for all threads
startLatch.countDown();
// this waits for all threads to be finished
stopLatch.await();
Phaser
FIXME