무작정 개발.Vlog

[Java8] 자바 CompletableFuture 프로그래밍

by 무작정 개발
반응형

1. Java Concurrent 프로그래밍

 

(1) Concurrent 소프트웨어

동시에 여러 작업을 할 수 있는 소프트웨어
  • ex) 인터넷 강의를 보면서 게임을 하고, 그 와중에 IntelliJ로 코딩할 수 있다.
  • ex) IntelliJ로 코드를 타이핑하면 코드가 입력됨과 동시에 추천 코드를 제안한다.

 

(2) Java에서 지원하는 Cuncurrent 프로그래밍

  • 멀티프로세싱(ProcessBuilder)
  • 멀티스레드(Multi Thread)

 

(3) Java 멀티스레드 프로그래밍 (Java Multi Thread Programming)

  • Java 프로세스의 기본 스레드는 main 스레드이다. -> main함수
  • 하나의 스레드에서 다른 스레드를 만드는 방법
    • Thread를 상속하는 방법
    • Runnable을 구현하는 방법

 

public class App {

    public static void main(String[] args) {
        MyThread myThread = new MyThread();
        myThread.start();

        // Anonymous class(익명 클래스)
        Thread runnable = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("Runnable Thread: " + Thread.currentThread().getName());
            }
        });
        
        // Lambda Expression(람다 표현식)
        Thread lambdaThread = new Thread(() -> 
        		System.out.println("Lambda Thread:" + Thread.currentThread().getName()));


        System.out.println("Hello: "+Thread.currentThread().getName());
    }

    // Thread 상속
    static class MyThread extends Thread {
        @Override
        public void run() {
            System.out.println("Thread: "+Thread.currentThread().getName());
        }
    }
}

/*
[ 실행 결과 ]

Hello:main
Thread:Thread-0

*/

-> 코드의 실행 순서만 봐서는 Thread가 먼저 출력될 거 같지만, 실제로 실행해보면 다르게 출력될 때도 있습니다.

이를 통해 Thread는 순서를 보장하지 않는다는 점을 알 수 있습니다.

 

(3) 스레드의 주요 기능(method)

  • sleep(mills)
    • 현재 스레드(thread) 재우기(잠깐 멈추기)
      • thread를 대기 상태로 멈춰서 다른 thread가 처리할 수 있도록 한다.
      • But, 락을 놔주지 않기에 자칫하면 데드락 상태에 걸릴 수 있음
public static void main(String[] args) throws InterruptedException {

    // Lambda Expression (람다 표현식)
    Thread lambdaThread = new Thread(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            System.out.println("interrupted!");
            return;
        }
        System.out.println("Thread: "+Thread.currentThread().getName());
    });
    
    lambdaThread.start();

    System.out.println("Hello: "+Thread.currentThread().getName());
}

1. Thread.sleep(1000L)

-> Thread를 시작하면 1초(1000L)동안 멈춰있고 그동안 다른 스레드를 수행하기에 Hello가 항상 우선으로 출력됩니다.

 

 

  • interrupt()
    • 다른 스레드를 깨워 InterruptException을 발생시킨다.
    • 이 Error에 대한 핸들링은 코딩을 통해 구현 가능
public static void main(String[] args) throws InterruptedException {

    //Lambda Expression (람다 표현식)
    Thread lambdaThread = new Thread(() -> {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            System.out.println("interrupted!");
            return;
        }
        System.out.println("Thread: "+Thread.currentThread().getName());
    });
    
    lambdaThread.start();

    lambdaThread.interrupt();
    System.out.println("Hello: "+Thread.currentThread().getName());
}

1. lambdaThread.interrupt();

-> lambdaThread에 interrupt() 메서드를 호출해서 lambdaThread 내에 InterruptedException을 발생시킵니다.

 

 

  • join()
    • 다른 스레드가 끝날 때까지 기다림
public static void main(String[] args) throws InterruptedException {

    // Lambda Expression (람다 표현식)
    Thread lambdaThread = new Thread(() -> {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            System.out.println("interrupted!");
            return;
        }
        System.out.println("Thread: "+Thread.currentThread().getName());
    });
    lambdaThread.start();

    lambdaThread.join();
    System.out.println("Hello: "+Thread.currentThread().getName());
}

1. lambdaThread.join();

-> lambdaThread에 join()메서드를 호출하여 lambdaThread가 종료될 때까지 기다립니다.


2. Executors

 

Executors란 스레드(Thread)나 Runnable처럼 Low-Level API를 직접 다루는 것이 아닌, 스레드를 만들고 관리하는 작업을

고 수준 API, Executors한테 위임하는 것을 말합니다.

 

[ Executors, High-Level Concurrency 프로그래밍 ]

  • 스레드를 만들고 관리하는 작업을 어플리케이션에서 분리
  • Executors가 스레드를 만들고 개발자는 Runnable에 해야 할 일을 정의해서 넘겨줌
  • 요약하면 개발자가 Runnable만 정의해서 제공해주면 스레드를 만들고, 불필요해지면 종료하고 관리해주는 작업을 대신해주는 클래스

 

(1) Executors가 하는 일

  • Thread 만들기 : 애플리케이션이 사용할 Thread Pool(스레드 풀)을 만들어서 관리
  • Thread 관리 : Thread 생명 주기를 관리
  • 작업 처리 및 실행 : Thread로 실행할 작업을 제공할 수 있는 API를 제공

 

(2) 주요 인터페이스

  • Executor : execute(Runnable)
  • ExecutorService :
    •  Executor를 상속 받은 인터페이스, Callable도 실행 가능하며 Executor를 종료시키거나 여러 Callable을 동시에 실행하는 등의 기능을 제공
  • ScheduledExecutorService :
    • ExecutorService를 상속받은 인터페이스, 특정 시간 이후에 또는 주기적으로 작업을 실행할 수 있음

 

(1) 예제 코드

 

[ 기본 사용 예제 ]

public static void main(String[] args) throws InterruptedException {

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    // Legacy case
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Thread: "+Thread.currentThread().getName());
        }
    });
    // Lambda Expression
    executorService.submit(()->{
        System.out.println("Lambda Expression Thread: "+Thread.currentThread().getName());
    });

    executorService.shutdown(); //graceful shutdown
    //executorService.shutdownNow(); // 즉시 종료
}

 

[ 2개의 Thread를 이용해서 실행 ]

public class App {

    private static Runnable getRunnable(String message) {
        return ()->System.out.println(message + Thread.currentThread().getName());
    }

    public static void main(String[] args) throws InterruptedException {
    
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        executorService.submit(getRunnable("Hello"));
        executorService.submit(getRunnable("World"));
        executorService.submit(getRunnable("The"));
        executorService.submit(getRunnable("Java"));
        executorService.submit(getRunnable("Lecture"));
        executorService.submit(getRunnable("Concurrent"));
        executorService.submit(getRunnable("Part"));

        executorService.shutdown(); // graceful shutdown
    }
}

/*
[ 실행 결과 ]

Hellopool-1-thread-1
Worldpool-1-thread-2
Javapool-1-thread-2
Thepool-1-thread-1
Lecturepool-1-thread-2
Concurrentpool-1-thread-1
Partpool-1-thread-2
*/

 

 

더보기

[Blocking Queue] 란?
특정 상황에서 Thread를 대시하도록 하는 큐를 말합니다. 멀티 스레드 환경에서 사용되는 자료구조이며, 큐의 상태에 따라 자원의 생성과
소비를 제어할 수 있는 큐입니다. 가득차있거나, 비어있는 경우 예외(Exception)를 방지하기 위해 각각의 상황에서 큐를 블록킹 합니다.

Block Queue
Block Queue / 출처 : 맨 하단에 기재

큐(Queue)에서 원소를 빼내려고 시도했는데 큐가 비어있거나, 큐에 원소를 넣으려 했는데 큐에 넣을 수 있는 공간이 없을 때, 디큐/인큐 호출 스레드를 대기하도록 합니다. 비어있는 큐에서 원소를 빼려고 시도하는 스레드의 대기 상태는 다른 스레드가 이 큐에 원소를 넣을 때까지 지속됩니다.

꽉 찬 큐에 원소를 넣으려고 시도하는 스레드의 대기 상태는 다른 스레드가 큐에서 원소를 빼거나 큐를 클리어(Clear)하거나 큐의 공간이 확보될 때까지 지속됩니다.

1. Executors.newFixedThreadPool(2)

-> 해당 메서드를 호출하면 해당 영역에는 인자 값으로 넘겨준 숫자만큼 Thread를 관리합니다.

위 코드에서는 2를 인자값으로 넘겨줬기 때문에 2개의 Thread를 관리하는 Thread Pool이 도는 동안 Blocking Queue에 등록된 작업들이 차례대로 동작합니다.

Executors.newFixedThreadPool(2)
Executors.newFixedThreadPool(2)


3. Callable과 Future

 

(1) Callable

  • Runnable과 거의 유시 하지만 반환 값을 가질 수 있다.
  • 정의한 Callable 작업을 수행하기 위해 submit 메서드의 인자로 넘겨주는데, submit메서드는 Future라는 것을 반환한다.
  • 즉, Callable이 반환하는 타입은 Future이다.
  • get() 메서드를 통해 값을 반환받을 수 있다.
    • 해당 메서드는 블록킹 콜(Blocking Call)이기에 메서드 호출 시점부터 코드 실행 완료까지 기다린다. (대기)
    • 타임아웃(Time Out)을 설정할 수 있다.
    • isDone() 메서드를 통해 작업 상태를 확인할 수 있다.
    • cancel() 메서드로 작업을 취소할 수 있다.
    • invokeAll() 메서드를 호출해서 여러 작업들을 동시에 실행할 수 있다.

 

(2) get() 메서드로 값 반환받기 - 타임아웃 설정

  • get() 메서드 이전까지는 Callable 작업을 기다리지 않고 계속 실행되는데 get() 메서드를 만나는 순간 결과 Callable의 결괏값을 얻어올 때까지 기다립니다. -> 블록킹 콜(Blocking Call) 상태
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> submit = executorService.submit(hello);
System.out.println("Started!");

submit.get();//blocking

System.out.println("End!");
executorService.shutdown();
/*
[실행결과]
Started!
(2초 뒤) -> 이건 출력 아님
End!
*/

 

(3) isDone() 메서드로 작업 상태 확인하기

  • isDone() 메서드를 사용해서 Callable의 작업 상태 확인
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.get(); //blocking

System.out.println(helloFuture.isDone());
System.out.println("End!");
executorService.shutdown();
/*
[실행결과]
false
Started!
true
End!
*/

1. get() 메서드를 호출하기 전에는 isDone()이 false로 아직 수행되지 않은 것을 알 수 있고, get() 메서드 호출 이후 시점에서는 isDone()이 true인 것으로 완료되었다는 사실을 알 수 있습니다.

 

(4) cancel() 메서드로 작업을 취소하기

  • 인자 값으로 현재 진행 중인 스레드 interrupt 여부를 결정합니다.
  • true이면 현재 진행 중인 스레드를 interrupt하고 그렇지 않으면(false) 현재 진행중인 작업이 끝날때까지 기다립니다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Future<String> helloFuture = executorService.submit(hello);
System.out.println(helloFuture.isDone());
System.out.println("Started!");

helloFuture.cancel(false);

System.out.println(helloFuture.isDone());
System.out.println("End!");

helloFuture.get();
executorService.shutdown();
/*
[실행결과]
false
Started!
true
End!
Exception in thread "main" java.util.concurrent.CancellationException..
*/

1. helloFuture.cancel(false)

-> 현재 진행중인 작업을 기다린 뒤 작업을 취소합니다. 작업을 취소되어 종료되었기 때문에 밑에 helloFuture.isDone()은 true가 반환되며, 이미 취소한 작업을 get() 메서드를 호출하는 시점에는 CancellationException 예외가 발생합니다.

 

(5) invokeAll() 메서드를 호출해 여러 작업들을 동시에 실행하기

  • invokeAll() 메서드에 여러 작업들의 컬렉션을 넘겨주면 여러 작업을 동시에 처리 가능
  • 특징은 모든 작업이 끝날 때까지 먼저 끝난 작업을 기다린다.
ExecutorService executorService = Executors.newSingleThreadExecutor();
LocalDateTime start = LocalDateTime.now();
Callable<String> hello = () ->{
    Thread.sleep(2000L);
    return "Hello";
};
Callable<String> java = () ->{
    Thread.sleep(3000L);
    return "java";
};
Callable<String> catsbi = () ->{
    Thread.sleep(1000L);
    return "hyunwoo";
};

List<Future<String>> futures = 
        executorService.invokeAll(Arrays.asList(hello, java, catsbi));
for (Future<String> future : futures) {
    System.out.println(future.get());
}
LocalDateTime end = LocalDateTime.now();
Duration between = Duration.between(start, end);
System.out.println(between.getSeconds());
/*
[실행결과]
Hello
java
hyunwoo
6
*/

1. 실행 결과

-> 1초, 2초, 3초짜리(sleep) Callable들이 수행되며 차례대로 출력되는 것이 아닌 전체가 끝날 때까지 기다렸다가 한번에 출력되었습니다.

그 이유는 invokeAll()에서는 어느 하나의 작업이 끝나더라도 다른 작업이 끝날때까지 기다렸다가 모든 작업이 끝나면 결과를 가져오기 때문입니다.

 

2. between.getSeconds()

-> 1, 2, 3초짜리 작업을 각각 수행하며 끝날 때까지 기다렸다가 출력하는데, cuncurrency하다면 가장 긴 작업시간인 3초가 지났을 때 모든 작업이 끝나야 합니다. 하지만 출력결과는 6초인데 그 이유는 ExecutorService에 할당된 스레드(Thread)가 싱글 스레드이기 때문에 내부에서 3개의 작업들이 각각 스레드에 할당되어 수행되는것이 아닌 하나의 스레드에 하나씩 작업할당 및 작업을 하며 대기하기 때문입니다.

이를 해결하기 위해서는 newSingleThreadExecutor()가 아닌 newFixedThreadFool(3)을 통해 3개의 스레드를 할당해주면 3초가 출력되는 것을 확인할 수 있습니다.

 

(6) invokeAny()

  • invokeAll() 메서드는 테스크(Task)가 모두 끝날때까지 기다렸다가 값들을 반환하는데 가장 먼저 완료된 작업만 반환해도 괜찮다면 invokeAll()을 쓰기에는 성능이 떨어집니다.
  • 이럴 때 사용하는 메서드가 invokeAny() 메서드인데, 이때 주의할 점은 Executors에 스레드가 하나만 할당되어있다면 결국 하나가 끝나야 다음 작업이 수행되는데 이때 가장 처음 할당되는 작업이 오래 걸리는 작업이라면 효과가 없기에 적절히 스레드를 할당해줘야 합니다.
  • 여러 작업 중에 하나라도 먼저 응답이 오면 끝낸다.
  • invokeAny()의 특징으로는 작업의 반환 값을 Future로 감싸지 않고 바로 반환하는 것
ExecutorService executorService = Executors.newFixedThreadPool(3);
 
Callable<String> hello = () -> {
	Thread.sleep(1000);
	
	return "hello";
};
 
Callable<String> world = () -> {
	Thread.sleep(2000);
	
	return "world";
};
 
Callable<String> bye = () -> {
	Thread.sleep(3000);
	
	return "bye";
};
 
String s = executorService.invokeAny(Arrays.asList(hello, world, bye));
 
System.out.println(s);
 
// hello

4. CompletableFuture

 

(1)  CompletableFuture 이란?

  • Java에서 비동기(Asynchronous) 프로그래밍을 가능하게 해 주는 인터페이스
  • Future의 제약사항들을 해결
  • Future와 CompletionStage를 구현하는 구현체(CompletableFuture)
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ...... }
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("catsbi");
System.out.println("future = " + future.get());
//-------------OR --------------
CompletableFuture<String> future = CompletableFuture.completedFuture("catsbi");
System.out.println("future = " + future.get());

 

[ Future 제약사항 ]

  • 예외 처리용 API를 제공하지 않는다.
  • 여러 Future를 조합할 수 없다.
  • Future를 외부에서 완료시킬 수 없다. -> 취소하거나, get()에 타임아웃을 설정할 수는 있다.
  • Future에서 반환하는 결괏값을 가지고 어떠한 작업을 수행해야 한다면 그 작업은 get() 메서드 이후에 작성해야 한다.
  • get() 메서드를 호출하기 전까지는 future를 다룰 수 없다.

 

(2) 비동기(Asynchronous)로 작업 실행하기

  • 리턴 값이 없는 경우 -> runAsync()
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("Hello" + Thread.currentThread().getName());
});
future.get();

 

  • 리턴값이 있는 경우 -> supplyAsync()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello" + Thread.currentThread().getName());
    return "Hello";
});
future.get();

 

(3) 콜백(Call Back) 제공하기

지금까지만 보면 명시적으로 Executors를 생성하지 않을 뿐 Future를 사용할 때와 동일합니다.

Future에서 반환하는 결괏값을 가지고 어떠한 작업을 수행한다면 그 작업을 get() 메서드 이후에 작성되야하고, 
CompletableFuture는 작업이 완료되었을 때 CallBack을 호출할 수 있습니다.

 

[ thenApply(Function) ]

  • 리턴 값(반환 값)을 받아서 다른 값으로 바꾸는 CallBack
  • Future만 사용했을 때는 Callback(thenApply)를 get() 호출 전에 정의하는 것이 불가능했다면 CompletableFuture를 사용하면 get() 호출 전에 Callback을 정의하는 것이 가능합니다.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
	System.out.println("hello world! " + Thread.currentThread().getName());
 
	return "I'm Kang";
}).thenApply(s -> {
	System.out.println(Thread.currentThread().getName());
 
	return s.toUpperCase();
});
 
System.out.println(future.get());
/*
[실행결과]
hello world! ForkJoinPool.commonPool-worker-1
main
I'M KANG
*/

 

[ thenAccept(Consumer) ]

  • 리턴 값을 받지 않고 다른 작업을 수행하는데 반환값은 없는 CallBack
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
	System.out.println("hello world! " + Thread.currentThread().getName());
 
	return "I'm Kang";
}).thenAccept(s -> {
	System.out.println(Thread.currentThread().getName());
});
 
future.get();
/*
[실행결과]
hello world! ForkJoinPool.commonPool-worker-1
main
*/

 

[ thenRun(Runnable) ]

  • 리턴값을 받지 않고 다른 작업을 수행하는데 반환도 하지 않는 CallBack
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
	System.out.println("hello world! " + Thread.currentThread().getName());
 
	return "I'm Kang";
}).thenRun(() -> {
	System.out.println(Thread.currentThread().getName());
});
 
future.get();

 


5. Thread Pool

어떻게 Thread Pool을 만들지 않고 별도의 스레드에서 동작하는가?

지금까지의 비동기 작업들의 예시를 보면 Thread Pool을 만들지 않고도 별도의 스레드가 작업을 처리해줬습니다.

이러한 동작이 가능한 이유는 ForkJoinPoll이 있기 때문인데, ForkJoinPool은 하나의 작업 큐를 가지며 ForkJoinPool에서 관리되는

여러 스레드는 Parent(부모) 작업에서 분기된 Child(자식) 작업을 처리하고(Fork) 각 스레드에서 처리된 결과를 합쳐 Parent(부모)에게

전달해서(Join) 병렬적으로 작업을 처리하는 프레임워크입니다.

-> 이런 방식으로 CommonPoll이라는 곳에서 스레드(Thread)를 가져와서 사용하고, 따로 구현한 스레드를 할당할 수도 있습니다.

 

ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("Hello" + Thread.currentThread().getName());
    return "hello";
}, executorService).thenRunAsync(()->{
    System.out.println(Thread.currentThread().getName());
}, executorService);
future.get();
executorService.shutdown();

1. ExecutorService executorService = Executors.newFixedThreadPool(4);

-> CompletableFuture에서 사용할 스레드 풀(Thread Pool)을 만들어준다.

 

2. CompletableFuture.supplyAsync(() -> { ... }, executorService)

-> 파라미터로 별도의 Thread Pool을 넣어주면 해당 Thread Pool을 사용하는데, runAsync에서도 사용 가능합니다.

 

3. .thenRunAsync(() -> { ... }, executorService);

-> 별도의 Thread Pool을 사용하고 싶으면 기존의 CallBack Method의 접미사(suffix)로 Async를 붙여서 사용하면 됩니다.

ex) thenRunAsync, thenAcceptAsync, thenApplyAsync

 


6. CompletableFuture 조합 메서드 및 예외 처리

 

Future만으로 작업을 이어서 수행하는 것이 어려웠는데, CallBack이 없었기에 비동기적인 작업 2개를 연결하는 것 자체가 어려웠습니다.

 

(1) thenCompose()

  • CompletableFuture 조합 메서드
  • 연관관계가 있는 비동기 작업 2개가 서로 이어서 실행하도록 조합하며, 연관된 future 간에 많이 사용한다.
  • 두 작업 간의 의존성이 필요할 때 사용 -> ex) A 작업이 수행된 다음 B작업을 수행해야 하는 상황
public class App {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CompletableFuture<String> helloFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Hello " + Thread.currentThread().getName());
            return "Hello";
        });

        CompletableFuture<String> future = helloFuture.thenCompose(App::getWorldFuture);
        System.out.println(future.get());

    }
    private static CompletableFuture<String> getWorldFuture(String message) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("World " + Thread.currentThread().getName());
            return message + " World";
        });
    }
}
/*
[실행결과]
Hello ForkJoinPool.commonPool-worker-3
World ForkJoinPool.commonPool-worker-5
Hello World
*/

 

(2) thenCombine()

  • CompletableFuture 조합 메서드
  • 연관관계가 없는 독립적인 작업을 조합할 때 사용
  • 두 작업을 독립적으로 실행하고 둘 다 종료했을 때 CallBack 실행
  • 양쪽 결과가 완료되었을 때 결과물은 BiFunction에서 처리
public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });

    CompletableFuture<String> resultFuture = 
									msFuture.thenCombine(appleFuture, (i, j) -> i + " " + j);
    System.out.println(resultFuture.get());
}

 

(3) allOf()

  • CompletableFuture 조합 메서드
  • 작업이 2개 이상일 때 모든 작업을 합쳐서 수행(실행)하고, 모든 작업 결과에 CallBack 실행
  • allOf()로 조합한 작업의 결과는 항상 void를 반환 -> ex) A 작업의 반환 값은 String, B작업의 반환값은 Integer 일수도 있기에
  • 만약 각 작업의 결과를 반환받고 싶다면 future의 Stream을 열어 결괏값을 List <Object>로 받을 수 있음
public static void main(String[] args) throws InterruptedException, ExecutionException {
    CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("MicroSoft " + Thread.currentThread().getName());
        return "MicroSoft";
    });

    CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("Apple " + Thread.currentThread().getName());
        return "Apple";
    });
    List<CompletableFuture<String>> futures = Arrays.asList(msFuture, appleFuture);

    CompletableFuture<List<String>> results =
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                    .thenApply(v -> futures.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList()));
    results.get().forEach(System.out::println);
}

 

(4) anyOf()

  • 여러 작업 중 가장 먼저 끝난 작업의 결과를 CallBack에 넘겨(반환) 실행
CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
});

CompletableFuture<String> appleFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("Apple " + Thread.currentThread().getName());
    return "Apple";
});

CompletableFuture<Void> future = CompletableFuture.anyOf(msFuture, appleFuture).thenAccept(System.out::println);
future.get();

 

(5) 예외 처리 메서드

 

1. exeptionally(Function)

boolean throwError = true;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).exceptionally(ex->{
    System.out.println(ex);
    return "Error";
});

System.out.println(msFuture.get());

 

2. handle(BiFunction)

boolean throwError = false;

CompletableFuture<String> msFuture = CompletableFuture.supplyAsync(() -> {
    if (throwError) {
        throw new IllegalArgumentException();
    }

    System.out.println("MicroSoft " + Thread.currentThread().getName());
    return "MicroSoft";
}).handle((result, ex)->{
    if (Objects.nonNull(ex)) {
        System.out.println(ex);
        return "ERROR";
    }
    return result;
});

System.out.println(msFuture.get());

Reference

인프런-더 자바, Java8_백기선

 

더 자바, Java 8 - 인프런 | 강의

자바 8에 추가된 기능들은 자바가 제공하는 API는 물론이고 스프링 같은 제 3의 라이브러리 및 프레임워크에서도 널리 사용되고 있습니다. 이 시대의 자바 개발자라면 반드시 알아야 합니다. 이

www.inflearn.com

kangword 티스토리 블로그

 

[Java8] Chapter 6-5. CompletableFuture, 작업의 조합과 예외 처리

✍️ CompletableFuture, 작업의 조합 Future만으론 작업을 이어서 수행하는 것이 어려웠다. = Callback이 없었기에 비동기적인 작업 두 개를 연결하는 것 자체가 어려웠다. 가령, hello가 끝나고 world를 해

kangworld.tistory.com

Catsbi's DLog

 

CompletableFuture

1. 자바 Concurrent 프로그래밍 소개

catsbi.oopy.io

반응형

블로그의 정보

무작정 개발

무작정 개발

활동하기