블로그 Help

Fork/Join 프레임워크와 CompletableFuture

시작

동시성 프로그래밍은 프로그램에서 매우 중요한 개념입니다. 이 Post에서는 Java에서 동시성 프로그래밍을 위해 사용하는 Fork/Join 프레임워크와 CompletableFuture에 대해 알아보겠습니다.

본문

Fork/Join 프레임워크

Fork/Join 프레임워크는 보이지 않은 스레드 풀을 사용하여 작업을 분할하고 자동으로 스케줄링하고 병렬로 실행하는 프레임워크입니다.

코드로 살펴보겠습니다.

// RecursiveAction은 ForkJoinTask<Void>의 하위 클래스입니다. // RecursiveAction은 compute() 메서드를 사용하여 작업을 수행합니다. public class TransactionSorter extends RecursiveAction { // 작업을 분할할 때, 작업의 크기가 이 값보다 작으면 작업을 분할하지 않고 작업을 수행합니다. private static final int SMALL_ENOUGH = 32; private final Transaction[] transactions; private final int start; private final int end; private final Transaction[] result; public TransactionSorter(List<Transaction> transactions) { this(transactions.toArray(new Transaction[0]), 0, transactions.size()); } public TransactionSorter(Transaction[] transactions) { this(transactions, 0, transactions.length); } private TransactionSorter(Transaction[] transactions, int start, int end) { this.transactions = transactions; this.start = start; this.end = end; this.result = new Transaction[size()]; } public int size() { return end - start; } public Transaction[] getResult() { return result; } @Override protected void compute() { if (size() < SMALL_ENOUGH) { System.arraycopy(transactions, start, result, 0, size()); Arrays.sort(result, 0, size()); // 작업이 킅난 쓰레드의 이름과 정렬된 결과를 출력합니다. System.out.println(Thread.currentThread().getName() + " : " + Arrays.toString(result)); } else { // 작업을 분할합니다. int mid = start + size() / 2; TransactionSorter left = new TransactionSorter(transactions, start, mid); TransactionSorter right = new TransactionSorter(transactions, mid, end); // 작업을 병렬로 실행합니다. invokeAll(left, right); // 작업을 병합합니다. merge(left, right); } } private void merge(TransactionSorter left, TransactionSorter right) { int i = 0; int l = 0; int r = 0; while (l < left.size() && r < right.size()) { int compare = left.result[l].compareTo(right.result[r]); result[i++] = compare < 0 ? left.result[l++] : right.result[r++]; } while (l < left.size()) { result[i++] = left.result[l++]; } while (r < right.size()) { result[i++] = right.result[r++]; } } } // ---------------------------- public class Transaction implements Comparable<Transaction> { private static final AtomicLong counter = new AtomicLong(1); private final Account sender; private final Account receiver; private final int amount; private final long id; private final LocalDateTime time; public Transaction(Account sender, Account receiver, int amount, LocalDateTime time) { this.sender = sender; this.receiver = receiver; this.amount = amount; this.id = counter.getAndIncrement(); this.time = time; } public static Transaction of(Account sender, Account receiver, int amount) { return new Transaction(sender, receiver, amount, LocalDateTime.now()); } @Override public int compareTo(Transaction o) { return Comparator.nullsFirst(LocalDateTime::compareTo).compare(this.time, o.time); } public Account getSender() { return sender; } public Account getReceiver() { return receiver; } public int getAmount() { return amount; } public long getId() { return id; } public LocalDateTime getTime() { return time; } @Override public int hashCode() { return Objects.hash(getSender(), getReceiver(), getAmount(), getId(), getTime()); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof Transaction)) { return false; } Transaction that = (Transaction) o; return getAmount() == that.getAmount() && getId() == that.getId() && Objects.equals(getSender(), that.getSender()) && Objects.equals(getReceiver(), that.getReceiver()) && Objects.equals(getTime(), that.getTime()); } @Override public String toString() { return new StringJoiner(", ", Transaction.class.getSimpleName() + "[", "]") .add("id=" + id) .toString(); } } // ---------------------------- public class Account { private int balance; public Account(int balance) { this.balance = balance; } public int getBalance() { return balance; } public void deposit(int amount) { balance += amount; } public void withdraw(int amount) { balance -= amount; } } // ---------------------------- public static void main(String[] args) { var transactions = new ArrayList<Transaction>(); var accounts = List.of(new Account(1000), new Account(2000), new Account(3000)); for (int i = 0; i < 100; i++) { var sender = accounts.get(i % accounts.size()); var receiver = accounts.get((i + 1) % accounts.size()); var amount = (i + 1) * 10; transactions.add(Transaction.of(sender, receiver, amount)); } Collections.shuffle(transactions); var sorter = new TransactionSorter(transactions); ForkJoinPool.commonPool().invoke(sorter); for (Transaction transaction : sorter.getResult()) { System.out.println(transaction); } }

위 코드는 Fork/Join 프레임워크를 사용하여 Transaction 객체를 정렬하는 코드입니다. 출력 결과는 다음과 같습니다.

main : [Transaction[id=1], Transaction[id=4], Transaction[id=7], Transaction[id=10], Transaction[id=16], Transaction[id=29], Transaction[id=35], Transaction[id=37], Transaction[id=41], Transaction[id=42], Transaction[id=45], Transaction[id=47], Transaction[id=49], Transaction[id=50], Transaction[id=54], Transaction[id=58], Transaction[id=61], Transaction[id=67], Transaction[id=72], Transaction[id=78], Transaction[id=83], Transaction[id=84], Transaction[id=86], Transaction[id=91], Transaction[id=99]] ForkJoinPool.commonPool-worker-5 : [Transaction[id=9], Transaction[id=12], Transaction[id=13], Transaction[id=14], Transaction[id=17], Transaction[id=22], Transaction[id=24], Transaction[id=25], Transaction[id=26], Transaction[id=27], Transaction[id=30], Transaction[id=32], Transaction[id=33], Transaction[id=40], Transaction[id=60], Transaction[id=62], Transaction[id=63], Transaction[id=68], Transaction[id=69], Transaction[id=75], Transaction[id=85], Transaction[id=93], Transaction[id=95], Transaction[id=97], Transaction[id=100]] ForkJoinPool.commonPool-worker-3 : [Transaction[id=11], Transaction[id=15], Transaction[id=20], Transaction[id=21], Transaction[id=31], Transaction[id=36], Transaction[id=39], Transaction[id=44], Transaction[id=46], Transaction[id=48], Transaction[id=56], Transaction[id=59], Transaction[id=66], Transaction[id=70], Transaction[id=73], Transaction[id=74], Transaction[id=76], Transaction[id=77], Transaction[id=79], Transaction[id=80], Transaction[id=82], Transaction[id=87], Transaction[id=94], Transaction[id=96], Transaction[id=98]] ForkJoinPool.commonPool-worker-7 : [Transaction[id=2], Transaction[id=3], Transaction[id=5], Transaction[id=6], Transaction[id=8], Transaction[id=18], Transaction[id=19], Transaction[id=23], Transaction[id=28], Transaction[id=34], Transaction[id=38], Transaction[id=43], Transaction[id=51], Transaction[id=52], Transaction[id=53], Transaction[id=55], Transaction[id=57], Transaction[id=64], Transaction[id=65], Transaction[id=71], Transaction[id=81], Transaction[id=88], Transaction[id=89], Transaction[id=90], Transaction[id=92]] Transaction[id=1] Transaction[id=2] Transaction[id=3] Transaction[id=4] Transaction[id=5] .... Transaction[id=100]

결과에서 볼 수 있듯이 작업이 분할되어 병렬로 스레드에서 실행되고 병합되어 정렬된 결과가 출력됩니다.

CompletableFuture

CompletableFuture는 Future 인터페이스를 확장한 인터페이스로 비동기 작업을 쉽게 처리할 수 있도록 도와줍니다.

코드를 통해 살펴보겠습니다.

public static void main(String[] args) { var future = CompletableFuture.supplyAsync(() -> { System.out.println("Starting on: " + Thread.currentThread().getName()); return "Starting!"; }); var future2 = future.thenApply(s -> { System.out.println("ThenApply on: " + Thread.currentThread().getName()); return s + "\nThenApply!"; }); var future3 = future2.thenApplyAsync(s -> { System.out.println("ThenApplyAsync on: " + Thread.currentThread().getName()); return s + "\nThenApplyAsync!"; }); try { System.out.println(future3.get()); } catch (Exception e) { e.printStackTrace(); } }

위 코드는 CompletableFuture를 사용하여 비동기 작업을 처리하는 코드입니다. 출력 결과는 다음과 같습니다.

Starting on: ForkJoinPool.commonPool-worker-3 ThenApply on: ForkJoinPool.commonPool-worker-3 ThenApplyAsync on: ForkJoinPool.commonPool-worker-5 Starting! ThenApply! ThenApplyAsync!

결과에서 볼 수 있듯이 supplyAsync() 메서드를 사용하여 비동기 작업을 시작하고 thenApply() 메서드를 사용하여 작업을 처리하고 thenApplyAsync() 메서드를 사용하여 비동기로 작업을 처리합니다. async 메서드를 사용하면 작업이 다른 스레드에서 실행됩니다.

마무리

이 Post에서는 Java의 동시성 프로그래밍에 사용하는 Fork/Join 프레임워크와 CompletableFuture에 대해 정리해 보았습니다. Fork/Join 프레임워크는 작업을 분할하고 병렬로 실행한 후 병합하는 프레임워크이며 CompletableFuture는 비동기 작업을 쉽게 처리할 수 있도록 도와주는 인터페이스입니다.

참조

Last modified: 06 January 2025