Java

Java concurrent programming

blackbearwow 2024. 9. 27. 10:49

1. Thread, Runnable

Thread, Runnable은 java 1.4 버전 이하에서 사용되던 기술이다.

 

java에서 쓰레드를 만드는 방법은 두가지이다.

Thread를 상속하거나 Runnable를 구현하는 것이다.

 

Thread를 상속하고 run()메소드를 재정의한다. 상속한 클래스 인스턴스를 만든다. 인스턴스의 start()로 쓰레드를 만들고 내부적으로 run()메소드를 실행시킨다.

class Main {
    public static void main(String[] args) {
        Abc thread = new Abc();
        thread.start();
        System.err.println("this code is outside of the thread");
    }
}

class Abc extends Thread {
    public void run() {
        System.err.println("this code is running in a thread");
        beep();
    }
    public void beep() {
        System.err.println("beep");
    }
}

 

Runnable를 구현하고 run()메소드를 재정의한다. 구현한 클래스 인스턴스를 만든다. Thread 클래스 생성자에 인스턴스를 넘겨 만든 Thread 인스턴스의 start()로 쓰레드를 만들고 내부적으로 run()메소드를 실행시킨다.

class Second {
    public static void main(String[] args) {
        Abc obj = new Abc();
        Thread thread = new Thread(obj);
        thread.start();
        System.err.println("this code is outside of the thread");
    }
}

class Abc extends Aaa implements Runnable {
    public void run() {
        System.err.println("this code is running in a thread");
        beep();
    }
}

class Aaa {
    public void beep() {
        System.err.println("beep");
    }
}

 

Thread와 Runnable의 다른점은, 다중상속이다. java는 다중상속을 지원하지 않기 때문에 Thread를 직접 상속하면 다른 클래스를 상속하지 못한다는 점이다. Runnable은 인터페이스이기 때문에 다른 클래스를 상속할 수 있다. 

 

thread의 대표적인 메소드는 sleep(), join(), currentThread(), run(), start()가 있다.

sleep()은 static 메소드로 지정된 숫자만큼 ms단위로 멈춘다.

join()은 해당 쓰레드가 종료될때까지 기다린다.

currentThread()는 현재 쓰레드의 인스턴스를 가져온다. 

run()은 쓰레드가 할 작업이다.

start()는 쓰레드 오브젝트에게 쓰레드를 만들고 run()을 실행시키는 메소드이다.

 

예시

class Second {
    public static void main(String[] args) {
        Abc obj = new Abc();
        obj.start();
        try{
            obj.join();
        } catch(InterruptedException e) {
            System.out.println("thread interrupted");
        }
        System.out.println(Thread.currentThread().getName());
    }
}

class Abc extends Thread {
    public void run() {
        System.out.println(Thread.currentThread().getName());
        for (int i = 0; i < 10; i++) {
            System.out.println("this code is running in a thread");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2. Callable, Future, Executor, ExecutorService, Executors

Callable, Future, Executor, ExecutorService, Executors는 Java 5(1.5)부터 사용되는 기술이다.

 

Runnable은 결과를 리턴하지 못하는 단점이 있다. 그것을 보완한것이 Callable이다. 제네릭으로 어떠한 값이든 리턴할 수 있다.

package java.util.concurrent;

@FunctionalInterface
public interface Callable<V> {
   V call() throws Exception;
}

 

Future는 비동기 작업의 결과를 나타낸다. javascript의 Promise와 비슷한 개념이다. get메소드로 결과를 가져올 수 있다.

package java.util.concurrent;

public interface Future<V> {
   boolean cancel(boolean mayInterruptIfRunning);

   boolean isCancelled();

   boolean isDone();

   V get() throws InterruptedException, ExecutionException;

   V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Executor, ExecutorService, ScheduledExecutorService, AbstractExecutorService, ThreadPoolExecutor, ScheduledThreadPoolExecutor의 상속 관계

Executor는 작업이 어떤 쓰레드에서 어떤 스케줄링으로 작동될지같은 메커니즘에서 작업 제출을 분리시킨 인터페이스이다.

package java.util.concurrent;

public interface Executor {
   void execute(Runnable var1);
}

execute라는 메소드만 정의되어있으며 함수형 인터페이스 조건을 만족한다.

ExecutorService 인터페이스가 Executor를 구현하고, ThreadPoolExecutor가 Executor를 상속해 execute메소드를 재정의해 사용한다.

 

Executors는 쓰레드 풀을 생성해주는 메소드들을 가지고있다.

Executor, ExecutorService, ScheduledExecutorService, ThreadFactory를 위한 팩토리 메소드와 유틸리티 메소드가 있다.

반환 메소드 설명
ExecutorService newFixedThreadPool(int nThreads) 고정된 쓰레드 개수를 가지는 쓰레드 풀을 생성한다.
장점: 적절한 쓰레드 개수를 가지는 풀을 생성한다면 자원 고갈을 막는다.
단점: 많은 쓰레드 개수라면 자원이 낭비되고, 적은 쓰레드 개수라면 응답시간이 늦어진다.
ExecutorService newSingleThreadExecutor() newFixedThreadPool(1)과 거의 같다.
ExecutorService newCachedThreadPool() 유동적으로 쓰레드 개수가 변하는 쓰레드 풀을 생성한다. 
쓰레드 개수는 현재 작업의 개수만큼 생기고, 일정 시간동안 작업이 없다면 쓰레드가 사라진다. 
장점: 버스티한 작업일 때 효율적이다.
단점: 계속되는 많은 작업량은 문제가 될 수 있다.
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 주기적으로 실행하는 명령을 스케줄할 수 있는 쓰레드 풀을 생성한다.
ScheduledExecutorService newSingleThreadScheduledExecutor() newScheduledThreadPool(1)과 거의 같다.

 

ExecutorService는 종료 관리 메소드와 비동기 작업들의 진행상태 추적을 위한 Future를 반환하는 메소드를 제공한다.

반환 타입 메소드 설명
Future<?> submit(Runnable task) Runnable 작업을 전송하고 그 작업을 나타내는 Future를 반환한다
<T> Future<T> sumbit(Callable<T> task) Callable 작업을 전송하고 그 작업을 나타내는 Future를 반환한다
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 작업들을 실행하고 모두 완료되었을 때, 작업들의 상태와 결과를 가지는 Future의 리스트를 반환
<T> T invokeAny(Collection<? extends Callable<T>> tasks) 작업들을 실행하고 그중 하나가 정상적으로 완료되었을 때, 해당작업의 결과를 반환
void shutdown() 전송된 작업들을 종료될때까지 기다렸다가 ExecutorService를 종료시킨다. 이 메소드가 호출되면 더이상 작업을 받지 못한다. 
List<Runnable> shutdownNow() waiting작업은 시작되는것을 막고, 현재 실행중인 작업을 멈추게 시도한다. waiting상태에 있는 작업들 리스트를 반환

ExecutorService를 만들어 작업을 실행했다면, shutdown을 호출하기 전까지 다음 작업을 대기한다. 또한 컴퓨터 자원의 환원을 위해 꼭 종료 작업을 거쳐야 한다.

 

sumbit(Callable<T> task) 예시

import java.util.concurrent.*;

class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Callable<String> task = () -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "End: " + Thread.currentThread().getName();
        };
        try {
            System.out.println(executorService.submit(task).get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }
}

 

invokeAll(Collection<? extends Callable<T>> tasks) 예시

Promise.all과 유사하다

import java.util.*;
import java.util.concurrent.*;

class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Callable<String> task = () -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "End: " + Thread.currentThread().getName();
        };
        ArrayList<Callable<String>> taskList = new ArrayList<>();
        for(int i=0; i<10; i++) {
            taskList.add(task);
        }
        try {
            List<Future<String>> futures = executorService.invokeAll(taskList);
            for(Future<String> f: futures) {
                System.out.println(f.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }
}

 

invokeAny(Collection<? extends Callable<T>> tasks) 예시

Promise.race와 유사하다

import java.util.*;
import java.util.concurrent.*;

class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Callable<String> task = () -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "End: " + Thread.currentThread().getName();
        };
        ArrayList<Callable<String>> taskList = new ArrayList<>();
        for(int i=0; i<10; i++) {
            taskList.add(task);
        }
        try {
            String result = executorService.invokeAny(taskList);
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }
}

 

Executors 클래스로 newFixedTheadPool, newSingleThreadExecutor, newCachedThreadPool을 호출하면 내부적으로 ThreadPoolExecutor 객체를 만드는데, ThreadPoolExecutor의 작동 방식은 대략적으로 다음그림과 같다. 쓰레드 풀이 꽉 차있다면 블로킹 큐에 작업이 대기한다. 쓰레드 풀에 작업이 없는 쓰레드가 생기면 블로킹 큐에서 기다리는 작업을 실행한다.

ScheduledExecutorService는 작업에 딜레이를 주거나, 주기적으로 실행시킬 수 있다.

반환 타입 메소드 설명
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) 지정된 딜레이 후에 작업을 한번 실행한다.
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 지정된 딜레이 후에 값을 반환하는 작업을 한번 실행한다.
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDealy, long period, TimeUnit unit) 지정된 딜레이 후에 지정된 기간마다 반복적으로 작업을 실행한다. 시작 시간 기준으로 기간마다 작업을 실행
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 지정된 초기 딜레이 후에 지정된 딜레이마다 반복적으로 작업을 실행한다. 작업이 끝난 시간 기준으로 딜레이마다 작업을 실행

 

scheduleAtFixedRate(Runnable command, long initialDealy, long period, TimeUnit unit) 예시

import java.time.*;
import java.util.concurrent.*;

class Main {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        Runnable task = () -> {
            System.out.println(LocalTime.now());
        };
        executor.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS);
    }
}

3. CompletableFuture

CompletableFuture는 Java8부터 사용되는 기술이다.

Future의 발전형으로 작업 콜백, 작업 조합, 예외 처리가 가능해졌다. 또한 기본적으로 쓰레드 풀을 만들지 않아도 되어 코드가 간결해진다.

 

작업 실행 메소드

반환 타입 메소드 설명
CompletableFuture<Void> runAsync(Runnable runnable) ForkJoinPool.commonPool()에 의해 실행된 작업의 CompletableFuture를 반환
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) ForkJoinPool.commonPool()에 의해 실행된 작업의 반환값을 가지는 CompletableFuture를 반환

ForkJoinPool은 Java7에서부터 사용되는 Executor이다. 다른 Executor를 사용하고싶다면 메소드 두번째 인자에 넘겨주면 된다.

 

작업 콜백 메소드

반환 타입 메소드 설명
<U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) 반환값을 받고, 다른값을 반환
CompletableFuture<Void> thenAccept(Comsumer<? super T> action) 반환값을 받고, 값을 반환하지 않는다
CompletableFuture thenRun(Runnable action) 반환값을 받지 않고, 값을 반환하지 않는다

 

thenAccept 예시

import java.util.concurrent.*;

class Main {
    public static void main(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "Hello";
        }).thenAccept(s -> {
            System.out.println(s);
        });
        System.out.println("after supplyAsync");
        long l = 0;
        for (long i = 0; i < 98765432100L; i++) {
            l++;
        }
        System.out.println(l);
    }
}

 

작업 조합 메소드

반환 타입 메소드 설명
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) 두 작업이 이어서 실행하도록 조합한다. 의존적인 Future의 조합에 사용한다.
<U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends fn) 두 작업을 독립적으로 실행하고, 둘다 완료되었을 때 콜백을 실행한다. 독립된 Future의 조합에 사용한다.
CompletableFuture(Void) allOf(CompletableFuture<?>... cfs) 주어진 모든 CompletableFuture가 완료되었을 때, 완료되는 CompletableFuture반환
CompletableFuture(Object) anyOf(CompletableFuture... cfs) 주어진 CompletableFuture중 하나라도 완료되었을 때, 완료되는 CompletableFuture반환

 

thenCompose 예시

import java.util.concurrent.*;

class Main {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> getRadius = CompletableFuture.supplyAsync(()->10);
        CompletableFuture<Double> findArea = getRadius.thenCompose(radius->{
            return CompletableFuture.supplyAsync(()->Math.PI*radius*radius);
        });
        System.out.println(findArea.get());
    }
}

thenCombine 예시

import java.util.concurrent.*;

class Main {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Double> getWeight = CompletableFuture.supplyAsync(()->{
            Double randomWeight = ThreadLocalRandom.current().nextDouble(30,100);
            System.out.println("Weight: "+ randomWeight);
            return randomWeight;
        });
        CompletableFuture<Double> getHeight = CompletableFuture.supplyAsync(()->{
            Double randomHeight = ThreadLocalRandom.current().nextDouble(100, 200)/100;
            System.out.println("Height: "+ randomHeight);
            return randomHeight;
        });
        CompletableFuture<Double> calculatedBMI = getHeight.thenCombine(getWeight, (height, weight)->{
            return weight/(height*height);
        });
        System.out.println("BMI: "+ calculatedBMI.get());
    }
}

allOf 예시

import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) {
        CompletableFuture<Void> randomSleep1 = CompletableFuture.runAsync(()->{
            Integer r = ThreadLocalRandom.current().nextInt(500, 2500);
            try {
                Thread.sleep(r);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(r);
        });
        CompletableFuture<Void> randomSleep2 = CompletableFuture.runAsync(()->{
            Integer r = ThreadLocalRandom.current().nextInt(500, 2500);
            try {
                Thread.sleep(r);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(r);
        });
        CompletableFuture<Void> cf = CompletableFuture.allOf(randomSleep1, randomSleep2);
        cf.join();
        System.out.println("end");
    }
}

anyOf 예시

import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) {
        CompletableFuture<Void> randomSleep1 = CompletableFuture.runAsync(()->{
            Integer r = ThreadLocalRandom.current().nextInt(500, 2500);
            try {
                Thread.sleep(r);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(r);
        });
        CompletableFuture<Void> randomSleep2 = CompletableFuture.runAsync(()->{
            Integer r = ThreadLocalRandom.current().nextInt(500, 2500);
            try {
                Thread.sleep(r);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(r);
        });
        CompletableFuture<Object> cf = CompletableFuture.anyOf(randomSleep1, randomSleep2);
        cf.join();
        System.out.println("end");
    }
}

 

예외 처리 메소드

반환 타입 메소드 설명
CompleteableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 발생한 에러를 받아 예외 처리한다
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) (결과값, 에러)를 받아 두 경우 모두 처리할 수 있다. 값을 반환한다
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) (결과값, 에러)를 받아 두 경우 모두 처리할 수 있다. 값을 반환하지 않는다

 

exceptionally 예시

import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) throws Exception{
        CompletableFuture<Object> numberFuture = CompletableFuture.supplyAsync(()->{
            throw new RuntimeException("exception while generating Number");
        }).exceptionally((e)->{
            System.err.println(e.getMessage());
            return -1;
        });
        System.out.println(numberFuture.get());
    }
}

handle 예시

import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) throws Exception{
        CompletableFuture<Object> numberFuture = CompletableFuture.supplyAsync(()->{
            throw new RuntimeException("exception while generating Number");
        }).handle((res, e)-> {
            if(e == null) {
                return 1;
            }
            else  {
                System.err.println(e.getMessage());
                return -1;
            }
        });
        System.out.println(numberFuture.get());
    }
}

whenComplete 예시

import java.util.concurrent.*;
public class Main {
    public static void main(String[] args) throws Exception{
        CompletableFuture.supplyAsync(()->{
            throw new RuntimeException("exception while generating Number");
        }).whenComplete((res, e)-> {
            if(e == null) {
                System.out.println(res);
            }
            else  {
                System.err.println(e.getMessage());
            }
        });
    }
}

참고: https://www.w3schools.com/java/java_threads.asp

https://mangkyu.tistory.com/258

https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/Thread.html

https://www.youtube.com/watch?v=nWOUpnvzmDQ

https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/ExecutorService.html

-

 

'Java' 카테고리의 다른 글

spring 설치와 환경설정  (0) 2025.02.07
Java annotation  (0) 2024.10.29
FunctionalInterface, Lambda Expression  (2) 2024.09.26
Java 파일 조작 (File Handling)  (0) 2024.09.23
Java 예외 처리 (Exception, try catch)  (1) 2024.09.21