본문 바로가기

Java

[Multi-Thread] Executor, ExecutorService와 ThreadPoolExecutor

728x90

자바에서는 다중 쓰레드 환경에서 작업을 비동기적으로 실행하기 위한 다양한 인터페이스와 클래스를 제공하고 있습니다. 대표적으로 Executor 인터페이스가 있고, Executor를 확장한 인터페이스인 ExecutorService 인터페이스, 그리고 ExecutorService 인터페이스를 구현한 클래스 중 하나인 ThreadPoolExecutor 가 있습니다.

 

이번 포스팅은 각각의 인터페이스 및 클래스에 대해서 작성해 보았습니다.


Executor 인터페이스

Executor 인터페이스는 java.util.concurrent 패키지에서 제공하는 인터페이스로, 아래와 같이 단일 추상 메서드 'execute(Runnable command)'를 가지고 있습니다. 이 메서드는 Runnable 객체를 매개변수로 받아서 해당 작업을 어떻게 실행할지를 Executor 인터페이스를 구현하는 구현체에게 위임합니다. 


public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

 

 

즉, Executor는 쓰레드 풀을 관리하고, 작업을 스케줄링하는 데 사용됩니다. execute 메서드에 전달된 Runnable 객체는 쓰레드 풀에서 관리되는 쓰레드들 중 하나에 할당되어 비동기적으로 실행됩니다.

 

이 Executor 인터페이스를 구현하는 구현체를 쓰레드 풀의 동작 방식을 다양하게 조절할 수 있는데, 여러 구현체들 중  ExecutorService에 대해서 알아보겠습니다.


ExecutorService 인터페이스

ExecutorService는 Executor를 확장한 인터페이스로, 작업을 관리하고 쓰레드 풀을 제어하는 데 아래와 같이 많은 기능을 제공합니다.


public interface ExecutorService extends Executor {

    void shutdown();
   
    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    <T> Future<T> submit(Callable<T> task);
    
  
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

  
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

   
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;


    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

 


위 여러 메서드 중 몇 가지 주요 메서드는 아래와 같은 기능을 제공합니다.

submit(Runnable task)

Runnable 작업을 제출하고, Future 객체를 반환합니다. Future 객체를 통해 작업의 상태를 확인하거나 결과를 얻을 수 있습니다.

submit(Callable<T> task)

Callable 작업을 제출하고 Future 객체를 반환합니다. Callable은 Runnable과 달리 결과를 반환하는 작업을 정의하는데 사용됩니다.

invokeAll(Collection<? extends Callable<T>> tasks)

여러 Callable 작업을 동시에 제출하고 모든 작업이 완료될 때까지 대기한 후 결과를 List<future<T>>로 반환합니다.

invokeAny(Collection<? extends Callable<T>> tasks)

여러 Callable 작업을 동시에 제출하고, 그중 하나의 작업이 완료되면 결과를 반환합니다. 다른 작업들은 취소됩니다.

shutdown()

쓰레드 풀을 종료합니다. 새로운 작업을 받지 않고, 현재 진행 중인 작업들이 모두 완료된 후에 쓰레드 풀이 종료됩니다.

shutdownNow()

쓰레드  풀을 즉시 종료하려고 시도하며, 현재 실행 중인 작업들을 취소하고 대기 중인 작업들을 무시합니다.


ExecutorService 사용법

ExecutorService를 사용하여 쓰레드 풀을 생성하는 방법은 Executors라는 클래스 내부의 static 메서드를 사용하여 생성하는데, 대표적으로 4가지 경우가 있습니다. 


1. FixedThreadPool 생성

고정된 크기의 쓰레드 풀을 생성하는 방법입니다. 

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // 5개의 쓰레드 생성

 

2. CachedThreadPool 생성

 

필요에 따라 쓰레드를 생성하고 재사용하는 동적인 크기의 쓰레드 풀을 생성합니다. 

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

 

3. SingleThreadExecutor 생성

 

단일 쓰레드로 이루어진 쓰레드 풀을 생성합니다. 이는 작업을 순차적으로 처리하고자 할 때 사용됩니다.

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

 

4. ScheduledThreadPool 생성

 

특정 시간 간격으로 또는 특정 시간에 주기적으로 작업을 실행할 수 있도록 스케줄링된 쓰레드 을 생성합니다.

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

 

 

이렇게 생성한 쓰레드 풀의 쓰레드들이 작업을 수행할 수 있도록 submit() 메서드로 작업을 쓰레드에게 할당합니다. 이렇게 쓰레드들에게 작업을 할당함으로써 비동기적으로 작업을 수행할 수 있습니다.

 

아래 코드를 실행하면 ExecutorService가 관리하는 쓰레드들의 작업들이 main 쓰레드가 종료되고 비동기적으로 수행됨을 확인할 수 있습니다.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceExample {
    public static void main(String[] args) {
        // ExecutorService 생성 - 여기서는 스레드 풀의 크기가 2인 스레드 풀을 생성
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 작업을 스레드 풀에 제출
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executorService.submit(() -> {
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
            });
        }

        // ExecutorService 종료
        executorService.shutdown();
        System.out.println("Main Thread Finished");
    }
}

[수행 결과]

 

 

2개의 고정된 쓰레드를 생성하였기 때문에, 2개의 쓰레드만 열심히 작업을 수행하는 것을 확인할 수 있습니다. 


 

만약 main 쓰레드가 종료되기 전에 모든 쓰레드들의 작업 결과를 받아야 한다면, 앞에서 설명하였듯이 Future 클래스의 get()를 사용해서 작업이 완료될 때까지 기다릴 수 있습니다.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceExample {
    public static void main(String[] args) throws InterruptedException{
       // ExecutorService 생성 - 여기서는 스레드 풀의 크기가 2인 스레드 풀을 생성
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        List<Future> futures = new ArrayList<>();

        // 작업을 스레드 풀에 제출
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            Runnable runnable = ()->{
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
            };
            futures.add(executorService.submit(runnable));
        }

        System.out.println("Wait until all threads complete their jobs");
        for(Future future: futures){
            future.get(); // 작업 기다리기
        }

        // ExecutorService 종료
        executorService.shutdown();
        System.out.println("Main Thread Finished");
    }
}

[수행 결과]

 


 

만약, Runnable이 아닌 어떤 결과를 반환하는 작업을 수행한다면 Callable을 사용해서 ExecutorService의 invokeAll()을 호출하여 모든 쓰레드의 작업을 기다리도록 구현할 수 있습니다.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceExample {
    public static void main(String[] args) throws InterruptedException{
       // ExecutorService 생성 - 여기서는 스레드 풀의 크기가 2인 스레드 풀을 생성
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        List<Callable<String>> tasks = new ArrayList<>();

        // 작업을 스레드 풀에 제출
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            Callable<String> callable = ()->{
                System.out.println("Task " + taskId + " is running on thread " + Thread.currentThread().getName());
                return "MyCallable";
            };
            tasks.add(callable);
        }

        System.out.println("Wait until all threads complete their jobs");
        List<Future<String>> futureList = executorService.invokeAll(tasks); // 모든 작업들 기다리기

        futureList.forEach(v-> {
            try {
                System.out.println("Result: " + v.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }); // 결과 출력하기


        // ExecutorService 종료
        executorService.shutdown();
        System.out.println("Main Thread Finished");
    }
}

 

 


ThreadPoolExecutor 클래스

위의 ExecutorService의 쓰레드 풀을 생성하기 위한 메소드들 중 ScheduleThreadPool()을 제외한 각 메소드를 확인해보면, 아래와 같이 내부적으로 ThreadPoolExecutor를 사용하여 구현되어 있는 것을 확인할 수 있습니다.


public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}

 

 

즉, ThreadPoolExecutor는 ExecutorService 인터페이스를 구현하는 구현체들 중 하나로, 개발자가 쓰레드 풀을 생성할 때, ExecutorService를 사용할 때보다 더 다양한 맞춤 설정을 할 수 있도록 유연성을 제공해 줍니다. 쉽게 말해, ExecutorService에서는 쓰레드 풀의 크기만을 설정할 수 있었다면, ThreadPoolExecutor를 사용하면 쓰레드 풀의 크기, 작업 대기 큐, 쓰레드 생성 방식 등을 설정함으로써 다양한 환경에서 사용할 수 있습니다.

 

ThreadPoolExecutor를 사용하면 위의 ExecutorService에서 쓰레드 풀을 생성하는 4가지 방법 외에 아래와 같이 Custom한 사용자 정의 쓰레드 풀을 생성할 수 있습니다.


int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 1000;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

ExecutorService customThreadPool = new ThreadPoolExecutor(
        corePoolSize,
        maxPoolSize,
        keepAliveTime,
        TimeUnit.MILLISECONDS,
        workQueue
);

 

 

각각의 주요 설정값들과 그 역할에 대해서는 다른 포스팅에서 작성해 보겠습니다. 


 

 

728x90
반응형