블로그 Help

자바 동시성 기초

시작

동시성 문제는 여러 스레드가 동시에 실행되는 상황에서 발생하는 문제를 의미합니다. 동시성 문제를 해결하는 것이 매우 중요합니다. 자바에서는 동시성 문제를 해결하기 위한 여러 방법을 제공합니다. 이를 간단히 정리하고자 합니다.

본문

자바는 주로 두 가지 동시성 API를 제공합니다.

  1. 블록 구조 동시성 또는 동기화 기반 동시성 또는 클래식 동시성 API

  2. java.util.concurrent 패키지를 사용하는 최신 동시성 API

블록 구조 동시성

블록 구조 동시성은 synchronizedvolatile 키워드를 사용하여 동시성 문제를 해결하는 방법입니다.

synchronized

synchronized 키워드는 메소드 또는 블록에 사용할 수 있습니다. synchronized 키워드를 사용하면 해당 메소드 또는 블록을 한 번에 하나의 스레드만 실행할 수 있도록 합니다.

public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; }

문제점

  • 원시 자료형이 아닌 객체만 잠글 수 있습니다.

  • 객체들의 배열을 잠가도 개별 객체를 잠글 수 없습니다.

  • 동기화된 메서드는 전체 메서드를 포괄하는 syncronized (this) 블록과 동일하다고 볼 수 있습니다. (하지만 바이트코드에서는 다르게 처리됩니다.)

  • static synchronized 메서드는 클래스 레벨에서 동기화됩니다.

  • Class 객체를 잠가야 하는 경우 하위 클래스에서 접근 방식에 따라 다르게 처리됩니다.

  • 내부 클래스의 동기화는 외부 클래스의 인스턴스를 잠그지 않습니다.

  • synchronized 메서드는 메서드 시그니처의 일부가 아니기 때문에 인터페이스의 메서드에 synchronized를 사용할 수 없습니다.

  • 동기화되지 않은 메서드는 잠금 상태를 고려하지 않고 신경 쓰지 않는다. 따라서 동기화되지 않은 메서드가 동기화된 메서드를 호출할 수 있습니다.

  • 자바의 잠금은 재진입(Reentrant)이 가능합니다. 즉, 같은 스레드가 이미 잠긴 잠금을 다시 잠글 수 있습니다.

스레드의 상태 모델

동시성을 이해하기 위해서는 스레드의 상태 모델을 이해해야 합니다. 스레드의 상태는 다음과 같습니다.

  • NEW: 스레드가 생성되었지만 start() 메서드가 호출되지 않은 상태

  • RUNNABLE: 실행 중인 스레드

    • BLOCKED: 동기화된 메서드 또는 블록에 의해 잠긴 스레드

    • WAITING: 다른 스레드가 통지할 때까지 기다리는 스레드

    • TIMED_WAITING: 일정 시간 동안 기다리는 스레드

  • TERMINATED: 실행을 완료한 스레드

RUNNABLE

Runnable

Blocked

Waithing

Timed waithing

NEW

TERMINATED

완전히 동기화된 객체(fully synchronized object)

완전히 동기화된 객체는 모든 메서드가 동기화된 메서드로 구성된 객체를 의미합니다. 필드는 private로 선언하고 synchronized 메서드로만 접근할 수 있습니다. 하지만 이러한 방식은 성능에 영향을 미칠 수 있습니다.

public class FSOAccount { private int balance; public FSOAccount(int initialBalance) { balance = initialBalance; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized void deposit(int amount) { balance += amount; } public synchronized int getBalance() { return balance; } }

교착상태 (Deadlock)

교착상태는 두 개 이상의 스레드가 서로 상대방의 작업이 끝나기만을 기다리고 있어 무한정 기다리는 상태를 의미합니다.

아래 코드는 잘 동작하지만 두 스레드가 서로의 잠금을 기다리는 상황이 발생할 수 있습니다.

// FSOAccount.java package me.service.hello; public class FSOAccount { private int balance; public FSOAccount(int balance) { this.balance = balance; } public synchronized boolean withdraw(int amount) { if (balance >= amount) { balance -= amount; return true; } return false; } public synchronized void deposit(int amount) { balance += amount; } public synchronized double getBalance() { return balance; } public synchronized boolean transfer(FSOAccount other, int amount) { try { Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } if (withdraw(amount)) { other.deposit(amount); return true; } return false; } }
// FSOMain.java package me.service.hello; public class FSOMain { private static final int MAX_TRANSFERS = 1_000; public static void main(String[] args) throws InterruptedException { FSOAccount a = new FSOAccount(10_000); FSOAccount b = new FSOAccount(10_000); Thread tA = new Thread(() -> { for (int i = 0; i < MAX_TRANSFERS; i++) { boolean success = a.transfer(b, 1); if (!success) { System.out.println("Transfer failed " + i); } } }); Thread tB = new Thread(() -> { for (int i = 0; i < MAX_TRANSFERS; i++) { boolean success = b.transfer(a, 1); if (!success) { System.out.println("Transfer failed " + i); } } }); tA.start(); tB.start(); tA.join(); tB.join(); System.out.println("Final balances: a = " + a.getBalance() + ", b = " + b.getBalance()); } }

Thread Dump 확인 결과

교착상태_thread_1.png
교착상태_thread_2.png

두 스레드는 Thread-0, Thread-1이며 서로의 잠금을 기다리고 있습니다.

해결 코드 예시

public void transfer(FSOAccount target, int amount) { FSOAccount firstLock = this; FSOAccount secondLock = target; // 항상 id가 작은 순서대로 lock을 걸어야 데드락이 발생하지 않습니다. if (this.id > target.id) { firstLock = target; secondLock = this; } // 데드락을 방지하기 위해 두 lock을 순서대로 lock을 걸어야 합니다. synchronized (firstLock) { synchronized (secondLock) { try { Thread.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } if (withdraw(amount)) { target.deposit(amount); } } } } // Test code @DisplayName("3 계정 간 입출금 테스트") @Test void testSyncTransfer3() throws InterruptedException { FSOAccount account1 = new FSOAccount(1000, 1); FSOAccount account2 = new FSOAccount(1000, 2); FSOAccount account3 = new FSOAccount(1000, 3); var theadPool = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 1000; i++) { theadPool.execute(() -> account1.transfer(account2, 1)); } for (int i = 0; i < 1000; i++) { theadPool.execute(() -> account2.transfer(account3, 1)); } for (int i = 0; i < 1000; i++) { theadPool.execute(() -> account3.transfer(account1, 1)); } theadPool.shutdown(); boolean awaited = theadPool.awaitTermination(10000, TimeUnit.MILLISECONDS); Assertions.assertAll( () -> assertThat(awaited).isTrue(), () -> assertThat(account1.getBalance()).isEqualTo(1000), () -> assertThat(account2.getBalance()).isEqualTo(1000), () -> assertThat(account3.getBalance()).isEqualTo(1000) ); }

volatile 키워드

volatile 사용하면 변수의 값을 읽거나 쓸 때 CPU 캐시가 아닌 메인 메모리에서 직접 읽거나 쓸 수 있습니다. volatile의 핵심은 메모리 위치에 대해 하나의 작업만 수행하도록 보장하는 것입니다. 하지만 volatilesynchronized와 달리 원자성을 보장하지 않습니다. 그래서 flag 변수와 같이 단순한 상태를 표현할 때 사용하는 것이 좋습니다.

java.util.concurrent 패키지 (빌딩 블록)

java.util.concurrent 패키지는 자바 5부터 추가된 패키지로 동시성 문제를 해결하기 위한 다양한 클래스와 인터페이스를 제공합니다.

java.util.concurrent.atomic 패키지

java.util.concurrent.atomic 패키지는 원자적 연산을 지원하는 클래스를 제공합니다. 원자적 연산은 여러 스레드가 동시에 접근해도 안전하게 읽고 쓸 수 있는 연산을 의미합니다.

  • AtomicBoolean

  • AtomicInteger

  • AtomicLong

  • AtomicReference

java.util.concurrent.locks 패키지

  • ReentrantLock: synchronized와 유사하지만 더 많은 기능을 제공합니다.

  • ReentrantReadWriteLock: 읽기와 쓰기 잠금을 분리하여 성능을 향상시킬 수 있습니다.

public void transfer(FSOAccount target, int amount) { var firstLock = id < target.id ? lock : target.lock; var secondLock = id < target.id ? target.lock : lock; firstLock.lock(); try { secondLock.lock(); try { try { Thread.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } if (withdraw(amount)) { target.deposit(amount); } } finally { secondLock.unlock(); } } finally { firstLock.unlock(); } }

CountDownLatch

CountDownLatch는 지정된 수의 작업이 완료될 때까지 기다리는 클래스입니다. CountDownLatch는 초기화할 때 카운트를 지정하고 countDown() 메서드로 카운트를 감소시키며 await() 메서드로 카운트가 0이 될 때까지 기다립니다.

@Test void testCountDownLatch() throws InterruptedException { var latch = new CountDownLatch(10); var count = new AtomicInteger(0); var threadPool = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 100; i++) { threadPool.execute(() -> { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } count.incrementAndGet(); latch.countDown(); }); } latch.await(); assertThat(count.get()).isEqualTo(10); threadPool.shutdown(); threadPool.awaitTermination(100000, TimeUnit.MILLISECONDS); assertThat(count.get()).isEqualTo(100); }

동시성 자료 구조

  • ConcurrentHashMap: 동시성을 지원하는 해시 맵, 쓰기 작업이 많을 때 사용

  • CopyOnWriteArrayList: 동시성을 지원하는 리스트, 쓰기 작업이 많지 않을 때 사용

  • BlockingQueue: 블로킹 큐, 생산자-소비자 패턴에 사용

    • ArrayBlockingQueue: 배열 기반 큐, 고정된 크기의 큐

    • LinkedBlockingQueue: 링크드 리스트 기반 큐, 크기가 고정되지 않은 큐

Future와 CompletableFuture

Future는 비동기 작업의 결과를 나타내는 인터페이스입니다. Future는 작업이 완료되었는지 확인하거나 작업의 결과를 가져오는 메서드를 제공합니다.

  • get(): 작업의 결과를 가져옵니다. 작업이 완료될 때까지 블로킹됩니다.

  • isDone(): 작업이 완료되었는지 확인합니다.

  • cancel(): 작업을 취소합니다.

@Test void testFuture() { var expected = "Hello, World!"; var executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); var future = executor.submit(() -> { Thread.sleep(1000); return expected; }); if (future.isDone()) { try { assertThat(future.get()).isEqualTo(expected); } catch (Exception e) { e.printStackTrace(); } } }

CompletableFutureFutureCompletionStage를 구현한 클래스로 비동기 작업을 쉽게 처리할 수 있습니다.

  • thenApply(): 작업의 결과를 변환합니다.

  • thenAccept(): 작업의 결과를 소비합니다.

  • thenRun(): 작업을 실행합니다.

  • thenCombine(): 두 작업의 결과를 합칩니다.

  • thenCompose(): 두 작업을 조합합니다.

@Test void testCompletableFuture() { var expected = "Hello, World!"; var executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); var future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return expected; }, executor); future.thenAccept(result -> assertThat(result).isEqualTo(expected)); }

Callable과 FutureTask

Callable은 코드조각을 실행하고 결과를 반환하는 인터페이스입니다.

  • call(): 작업을 실행합니다.

@Test void testCallable() throws Exception { Callable<String> callable = () -> { Thread.sleep(1000); return "Hello, World!"; }; assertThat(callable.call()).isEqualTo("Hello, World!"); }

FutureTaskRunnableFuture를 구현한 클래스로 RunnableFuture를 구현한 클래스입니다. FutureTaskCallable을 구현한 클래스를 생성자로 받아 작업을 실행합니다.

  • run(): 작업을 실행합니다.

  • get(): 작업의 결과를 가져옵니다. 작업이 완료될 때까지 블로킹됩니다.

  • cancel(): 작업을 취소합니다.

  • isDone(): 작업이 완료되었는지 확인합니다.

  • isCancelled(): 작업이 취소되었는지 확인합니다.

@Test void testFutureTask() { var expected = "Hello, World!"; var executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); var futureTask = new FutureTask<>(() -> { Thread.sleep(1000); return expected; }); executor.execute(futureTask); try { assertThat(futureTask.get()).isEqualTo(expected); } catch (Exception e) { e.printStackTrace(); } }

다양한 ThreadPoolExecutor

Executor은 작업을 실행하는 인터페이스입니다.

  • execute(): 작업을 실행합니다.

하지만 Executor는 작업의 상태를 확인하거나 작업의 결과를 가져오는 메서드를 제공하지 않습니다. ExecutorServiceExecutor를 상속한 인터페이스로 작업의 상태를 확인하거나 작업의 결과를 가져오는 메서드를 제공합니다.

  • submit(): 작업을 실행하고 Future를 반환합니다.

  • shutdown(): 작업을 종료합니다.

  • awaitTermination(): 작업이 완료될 때까지 기다립니다.

이런 ExecutorService를 생성하는 팩토리 메서드를 제공하는 클래스가 Executors 클래스입니다. Executors 클래스는 다양한 스레드 풀을 생성하는 팩토리 메서드를 제공합니다.

  • newSingleThreadExecutor(): 단일 스레드 풀을 생성합니다.

  • newFixedThreadPool(int n): 고정된 스레드 풀을 생성합니다. n개의 스레드를 생성합니다.

  • newCachedThreadPool(): 캐시 스레드 풀을 생성합니다. 60초 동안 사용되지 않은 스레드는 종료됩니다.

  • newScheduledThreadPool(int n): 스케줄링 스레드 풀을 생성합니다.

마무리

이 글에서는 자바 동시성 기초에 대해 간단히 정리했습니다. synchronizedvolatile 키워드를 사용한 동시성 문제 해결 방법과 java.util.concurrent 패키지를 활용한 동시성 문제 해결 방법을 살펴보았습니다.

참조

Last modified: 13 December 2024