Java 동시성 컬렉션 (2) – Concurrent Collection

Java 동시성 컬렉션에서 Collections.synchronizedXxx()를 살펴보았다. 이번에는 그 한계를 극복한 java.util.concurrent 패키지와 고성능 동시성 컬렉션들을 알아보자

왜 별도의 동시성 컬렉션이 필요할까?

Collections.synchronized의 문제

  • 모든 메서드에 단순하게 synchronized 적용
  • 잠금 범위가 너무 넓음
  • 한 번에 하나의 스레드만 접근 가능
  • 성능 최적화 불가능

Concurrent 컬렉션의 해결책

  • 정교한 잠금 메커니즘 (세그먼트 락, CAS 등)
  • 최소한의 잠금 범위
  • 여러 스레드의 동시 접근 지원
  • 다양한 최적화 기법 적용

동시성 컬렉션 개요

java.util.concurrent 패키지 핵심 특징
  • synchronized: 기본 동기화
  • ReentrantLock: 고급 락
  • CAS 연산: Compare-And-Swap (락 없는 동기화)
  • volatile 변수: 가시성 보장
  • 세그먼트 락: 분할 잠금 기술
  • 락-프리 알고리즘: Non-blocking 알고리즘

제공되는 컬렉션 종류

타입일반 컬렉션Concurrent 컬렉션특징
ListArrayListCopyOnWriteArrayList읽기 많을 때 최적
SetHashSetCopyOnWriteArraySet읽기 많을 때 최적
TreeSetConcurrentSkipListSet정렬 + 동시성
MapHashMapConcurrentHashMap가장 많이 사용
TreeMapConcurrentSkipListMap정렬 + 동시성
QueueConcurrentLinkedQueue비차단 큐
DequeConcurrentLinkedDeque비차단 덱
  • LinkedHashSet, LinkedHashMap의 동시성 버전은 제공되지 않는다. 필요시 Collections.synchronizedXxx()를 사용해야 한다
List 구현체

CopyOnWriteArrayList

ArrayList의 Thread-safe 버전
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListExample {
    public static void main(String[] args) {
        List<Integer> list = new CopyOnWriteArrayList<>();
        
        list.add(1);
        list.add(2);
        list.add(3);
        
        System.out.println("list = " + list);
    }
}

출력
list = [1, 2, 3]

Copy-On-Write 전략

핵심 아이디어: 쓰기 시 전체 배열을 복사

// 쓰기 작업 (add, remove 등)
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 전체 배열 복사
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

// 읽기 작업 (get 등)
public E get(int index) {
    // 동기화 없음
    return get(getArray(), index);
}
장점
  • 읽기 작업은 락 없이 매우 빠르다
  • Iterator가 ConcurrentModificationException을 던지지 않는다
  • 반복 중 수정이 가능하다
List<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
list.add("C");

// 안전하게 반복 중 수정 가능
for (String item : list) {
    if (item.equals("B")) {
        list.remove(item);  // OK
    }
}
단점
  • 쓰기 작업이 매우 느리다 (전체 배열 복사)
  • 메모리 사용량이 증가한다
  • 쓰기가 많으면 성능이 급격히 저하된다

사용 시나리오

// 적합: 읽기 작업이 쓰기보다 훨씬 많은 경우
// 예: 이벤트 리스너 리스트, 설정 정보
public class EventBus {
    private final List<EventListener> listeners = 
        new CopyOnWriteArrayList<>();
    
    public void register(EventListener listener) {
        listeners.add(listener);  // 가끔 발생
    }
    
    public void fireEvent(Event event) {
        for (EventListener listener : listeners) {  // 자주 발생
            listener.onEvent(event);
        }
    }
}

// 부적합: 쓰기가 빈번한 경우
// 예: 실시간 로그 수집, 채팅 메시지 리스트
Set 구현체

CopyOnWriteArraySet

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

public class SetExample {
    public static void main(String[] args) {
        Set<Integer> set = new CopyOnWriteArraySet<>();
        
        set.add(1);
        set.add(2);
        set.add(3);
        set.add(2);  // 중복 제거됨
        
        System.out.println("copySet = " + set);
    }
}

출력
copySet = [1, 2, 3]

특징

  • HashSet의 대안
  • 내부적으로 CopyOnWriteArrayList 사용
  • 순서 보장이 안 된다(입력 순서가 나올 수도 있지만 보장하지 않음)
  • 읽기 많을 때 최적

ConcurrentSkipListSet

import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;

public class SetExample {
    public static void main(String[] args) {
        Set<Integer> set = new ConcurrentSkipListSet<>();
        
        // 순서와 관계없이 추가
        set.add(3);
        set.add(1);
        set.add(2);
        
        System.out.println("skipSet = " + set);
    }
}

출력
skipSet = [1, 2, 3]  // 항상 정렬된 순서

특징

  • TreeSet의 대안
  • 정렬된 순서를 유지한다
  • Comparator 사용이 가능하다
  • Skip List 자료구조 기반
// 커스텀 정렬
Set<String> set = new ConcurrentSkipListSet<>(
    Comparator.comparing(String::length)
        .thenComparing(String::compareTo)
);

set.add("apple");
set.add("pie");
set.add("banana");

System.out.println(set);  // [pie, apple, banana] (길이순, 같으면 사전순)
Map 구현체

ConcurrentHashMap

가장 많이 사용되는 동시성 컬렉션

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MapExample {
    public static void main(String[] args) {
        Map<Integer, String> map = new ConcurrentHashMap<>();
        
        map.put(3, "data3");
        map.put(2, "data2");
        map.put(1, "data1");
        
        System.out.println("map = " + map);
    }
}

출력
map = {1=data1, 2=data2, 3=data3}

세그먼트 락 메커니즘

// 개념적인 구조
전체 맵을 여러 세그먼트로 분할
┌──────────┬──────────┬──────────┬──────────┐
│Segment 0 │Segment 1 │Segment 2 │Segment 3 │
└──────────┴──────────┴──────────┴──────────┘
     ↓          ↓          ↓          ↓
  Thread-1   Thread-2   Thread-3   Thread-4
  
→ 각 세그먼트는 독립적으로 락
→ 서로 다른 세그먼트는 동시 접근 가능

성능 최적화 기법

// 1. 읽기 작업 - 대부분 락 없음
V get(K key) {
    // volatile 읽기로 최신 값 보장
    // 락 획득 없이 빠르게 처리
}

// 2. 쓰기 작업 - 해당 세그먼트만 락
V put(K key, V value) {
    int hash = hash(key);
    int segment = hash & segmentMask;
    
    // 해당 세그먼트만 락
    synchronized (segments[segment]) {
        // 실제 작업
    }
}

// 3. CAS 연산 활용
boolean putIfAbsent(K key, V value) {
    // Compare-And-Swap으로 락 없이 처리
}

유용한 메서드들

Map<String, Integer> map = new ConcurrentHashMap<>();

// 원자적 연산들 - 복합 작업도 안전
map.putIfAbsent("count", 0);
map.computeIfAbsent("count", k -> 0);
map.computeIfPresent("count", (k, v) -> v + 1);
map.merge("count", 1, Integer::sum);

// 일반 Map vs ConcurrentHashMap
// 일반 Map - 경쟁 상태 발생
if (!map.containsKey("count")) {
    map.put("count", 1);  // 다른 스레드가 끼어들 수 있음
}

// ConcurrentHashMap - 안전
map.putIfAbsent("count", 1);  // 원자적 연산

실무 활용 예시

// 캐시 구현
public class CacheService {
    private final Map<String, User> cache = new ConcurrentHashMap<>();
    
    public User getUser(String id) {
        return cache.computeIfAbsent(id, this::loadFromDatabase);
    }
    
    private User loadFromDatabase(String id) {
        // DB 조회
        return userRepository.findById(id);
    }
}

// 카운터
public class AccessCounter {
    private final Map<String, AtomicInteger> counts = new ConcurrentHashMap<>();
    
    public void increment(String page) {
        counts.computeIfAbsent(page, k -> new AtomicInteger())
              .incrementAndGet();
    }
    
    public int getCount(String page) {
        return counts.getOrDefault(page, new AtomicInteger()).get();
    }
}

ConcurrentSkipListMap

import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;

public class MapExample {
    public static void main(String[] args) {
        Map<Integer, String> map = new ConcurrentSkipListMap<>();
        
        map.put(2, "data2");
        map.put(3, "data3");
        map.put(1, "data1");
        
        System.out.println("map = " + map);
    }
}

출력
map = {1=data1, 2=data2, 3=data3}  // 항상 키 순서로 정렬

특징

  • TreeMap의 대안
  • 정렬된 순서를 유지한다
  • Comparator 사용 가능
  • NavigableMap 인터페이스 구현
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
map.put("apple", 1);
map.put("banana", 2);
map.put("cherry", 3);

// NavigableMap 메서드 활용
System.out.println(map.firstKey());        // apple
System.out.println(map.lastKey());         // cherry
System.out.println(map.higherKey("apple")); // banana
Queue 구현체

ConcurrentLinkedQueue

비차단(Non-Blocking) 큐

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class QueueExample {
    public static void main(String[] args) {
        Queue<String> queue = new ConcurrentLinkedQueue<>();
        
        queue.offer("task1");
        queue.offer("task2");
        queue.offer("task3");
        
        System.out.println(queue.poll());  // task1
        System.out.println(queue.poll());  // task2
        System.out.println(queue.peek());  // task3 (제거 안 됨)
    }
}

특징

  • 무한 크기
  • CAS 기반 구현 (락 없음)
  • 매우 빠른 성능
  • 큐기 비어도 블록하지 않음 (null 반환)
// 작업 큐 예시
public class TaskQueue {
    private final Queue<Task> queue = new ConcurrentLinkedQueue<>();
    
    public void addTask(Task task) {
        queue.offer(task);
    }
    
    public Task getTask() {
        return queue.poll();  // 없으면 null, 블록 안 함
    }
}

ConcurrentLinkedDeque

import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;

Deque<String> deque = new ConcurrentLinkedDeque<>();
deque.addFirst("first");
deque.addLast("last");

System.out.println(deque.removeFirst());  // first
System.out.println(deque.removeLast());   // last
특징
  • 양방향 큐
  • 앞뒤 모두 추가/제거 가능
  • 비차단 방식

BlockingQueue – 생산자-소비자 패턴

스레드를 차단(Block)하는 큐로 생산자 – 소비자 패턴의 핵심

ArrayBlockingQueue

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
        
        // 생산자
        new Thread(() -> {
            try {
                System.out.println("생산자: 데이터 추가");
                queue.put("data");  // 큐가 가득 차면 대기
                System.out.println("생산자: 추가 완료");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer").start();
        
        // 소비자
        new Thread(() -> {
            try {
                Thread.sleep(1000);  // 1초 대기
                System.out.println("소비자: 데이터 가져가기");
                String data = queue.take();  // 큐가 비면 대기
                System.out.println("소비자: " + data);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer").start();
    }
}

출력

생산자: 데이터 추가
생산자: 추가 완료
(1초 대기)
소비자: 데이터 가져가기
소비자: data
특징
  • 크기 고정
  • 큐가 가득 차면 생산자 대기 (put)
  • 큐가 비면 소비자 대기 (take)
  • 공정(Fair) 모드 지원
// 공정 모드 - FIFO 순서로 스레드에게 락 부여
BlockingQueue<String> fairQueue = new ArrayBlockingQueue<>(10, true);

// 일반 모드 - 성능은 좋지만 순서 보장 안 됨
BlockingQueue<String> normalQueue = new ArrayBlockingQueue<>(10, false);
BlockingQueue 메서드 비교
동작예외 발생특별한 값 반환블록타임아웃
삽입add(e)offer(e)put(e)offer(e, time, unit)
제거remove()poll()take()poll(time, unit)
검사element()peek()
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

// 1. 예외 발생
queue.add("A");
queue.add("B");
queue.add("C");  // IllegalStateException - 큐가 가득 참

// 2. 특별한 값 반환
queue.offer("A");  // true
queue.offer("B");  // true
queue.offer("C");  // false - 큐가 가득 참

// 3. 블록
queue.put("A");  // 성공
queue.put("B");  // 성공
queue.put("C");  // 여기서 대기... 공간이 생길 때까지

// 4. 타임아웃
boolean success = queue.offer("C", 1, TimeUnit.SECONDS);
// 1초 동안 대기, 실패하면 false 반환

LinkedBlockingQueue

// 무한 크기
BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>();

// 크기 제한
BlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);
특징
  • 크기 무한 또는 고정 가능
  • 링크드 리스트 기반
  • ArrayBlockingQueue보다 처리량 높음
  • 생산자와 소비자가 다른 락 사용(더 나은 동시성)
// 실무 예시: 작업 큐
public class WorkerPool {
    private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
    private final List<Thread> workers = new ArrayList<>();
    
    public WorkerPool(int workerCount) {
        for (int i = 0; i < workerCount; i++) {
            Thread worker = new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        Runnable task = taskQueue.take();  // 작업 대기
                        task.run();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
            worker.start();
            workers.add(worker);
        }
    }
    
    public void submit(Runnable task) {
        try {
            taskQueue.put(task);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

PriorityBlockingQueue

import java.util.concurrent.PriorityBlockingQueue;

public class PriorityQueueExample {
    static class Task implements Comparable<Task> {
        String name;
        int priority;
        
        Task(String name, int priority) {
            this.name = name;
            this.priority = priority;
        }
        
        @Override
        public int compareTo(Task o) {
            return Integer.compare(this.priority, o.priority);
        }
        
        @Override
        public String toString() {
            return name + "(우선순위: " + priority + ")";
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
        
        queue.put(new Task("낮은 작업", 3));
        queue.put(new Task("높은 작업", 1));
        queue.put(new Task("중간 작업", 2));
        
        System.out.println(queue.take());  // 높은 작업(우선순위: 1)
        System.out.println(queue.take());  // 중간 작업(우선순위: 2)
        System.out.println(queue.take());  // 낮은 작업(우선순위: 3)
    }
}
특징
  • 우선순위 기반 처리
  • 힙(Heap) 자료 구조
  • 무한 크기
  • put()은 블록 안 함(무한 크기라서)

SynchronousQueue – 직거래 큐

import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueExample {
    public static void main(String[] args) {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        
        // 생산자
        new Thread(() -> {
            try {
                System.out.println("생산자: 대기 중...");
                queue.put("data");
                System.out.println("생산자: 전달 완료");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        // 소비자 (2초 후 실행)
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("소비자: 수신 - " + queue.take());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

출력

생산자: 대기 중...
(2초 대기 - 생산자가 블록됨)
소비자: 수신 - data
생산자: 전달 완료
특징
  • 크기가 0인 큐(버퍼 없음)
  • 생산자와 소비자가 직접 핸드오프
  • 생산자는 소비자가 받을 때까지 대기
  • 소비자는 생산자가 줄 때가지 대기
  • 매우 빠른 직접 전달

DelayQueue

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueExample {
    static class DelayedTask implements Delayed {
        private final String name;
        private final long startTime;
        
        DelayedTask(String name, long delayMillis) {
            this.name = name;
            this.startTime = System.currentTimeMillis() + delayMillis;
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
            long diff = startTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }
        
        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.startTime, ((DelayedTask) o).startTime);
        }
        
        @Override
        public String toString() {
            return name;
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedTask> queue = new DelayQueue<>();
        
        System.out.println("작업 추가: " + System.currentTimeMillis());
        queue.put(new DelayedTask("3초 후 작업", 3000));
        queue.put(new DelayedTask("1초 후 작업", 1000));
        queue.put(new DelayedTask("2초 후 작업", 2000));
        
        while (!queue.isEmpty()) {
            DelayedTask task = queue.take();  // 지연 시간까지 대기
            System.out.println(System.currentTimeMillis() + ": " + task);
        }
    }
}

출력

작업 추가: 1704067200000
1704067201000: 1초 후 작업
1704067202000: 2초 후 작업
1704067203000: 3초 후 작업
특징
  • 지정된 지연 시간 후 처리
  • 스케줄링 작업에 유용
  • 만료 시간 관리에 최적

컬렉션 선택 플로우차트

시작
  ↓
단일 스레드인가?
  ├─ Yes → ArrayList, HashMap 등 일반 컬렉션
  └─ No
      ↓
      List가 필요한가?
      ├─ Yes
      │   └─ 읽기가 쓰기보다 훨씬 많은가?
      │       ├─ Yes → CopyOnWriteArrayList
      │       └─ No → Collections.synchronizedList()
      │
      Set이 필요한가?
      ├─ Yes
      │   ├─ 정렬이 필요한가?
      │   │   ├─ Yes → ConcurrentSkipListSet
      │   │   └─ No → CopyOnWriteArraySet
      │
      Map이 필요한가?
      ├─ Yes
      │   ├─ 정렬이 필요한가?
      │   │   ├─ Yes → ConcurrentSkipListMap
      │   │   └─ No → ConcurrentHashMap ⭐
      │
      Queue가 필요한가?
      └─ Yes
          ├─ 블록킹이 필요한가?
          │   ├─ Yes → ArrayBlockingQueue / LinkedBlockingQueue
          │   └─ No → ConcurrentLinkedQueue
          └─ 우선순위가 필요한가?
              └─ Yes → PriorityBlockingQueue
실무 상황별 추천

캐시 구현

// ConcurrentHashMap
public class Cache<K, V> {
    private final Map<K, V> cache = new ConcurrentHashMap<>();
    
    public V get(K key, Function<K, V> loader) {
        return cache.computeIfAbsent(key, loader);
    }
    
    public void invalidate(K key) {
        cache.remove(key);
    }
}

이벤트 리스너 관리

// CopyOnWriteArrayList
public class EventBus {
    private final List<EventListener> listeners = 
        new CopyOnWriteArrayList<>();
    
    public void register(EventListener listener) {
        listeners.add(listener);  // 가끔
    }
    
    public void fire(Event event) {
        for (EventListener listener : listeners) {  // 자주
            listener.onEvent(event);
        }
    }
}

작업 큐 (ThreadPool)

// LinkedBlockingQueue
public class SimpleThreadPool {
    private final BlockingQueue<Runnable> workQueue;
    private final List<Worker> workers;
    
    public SimpleThreadPool(int threadCount, int queueSize) {
        this.workQueue = new LinkedBlockingQueue<>(queueSize);
        this.workers = new ArrayList<>();
        
        for (int i = 0; i < threadCount; i++) {
            Worker worker = new Worker(workQueue);
            worker.start();
            workers.add(worker);
        }
    }
    
    public void submit(Runnable task) throws InterruptedException {
        workQueue.put(task);
    }
    
    private static class Worker extends Thread {
        private final BlockingQueue<Runnable> queue;
        
        Worker(BlockingQueue<Runnable> queue) {
            this.queue = queue;
        }
        
        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Runnable task = queue.take();
                    task.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

통계/카운터

// ⭐ 추천: ConcurrentHashMap + AtomicInteger
public class Statistics {
    private final Map<String, AtomicInteger> counters = 
        new ConcurrentHashMap<>();
    
    public void increment(String key) {
        counters.computeIfAbsent(key, k -> new AtomicInteger())
                .incrementAndGet();
    }
    
    public Map<String, Integer> getSnapshot() {
        Map<String, Integer> snapshot = new HashMap<>();
        counters.forEach((k, v) -> snapshot.put(k, v.get()));
        return snapshot;
    }
}

주의 사항

복합 연산은 여전히 주의 필요

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 잘못된 방법
if (!map.containsKey("count")) {
    map.put("count", 0);  // 경쟁 상태!
}

// 올바른 방법
map.putIfAbsent("count", 0);

반복 중 수정

// CopyOnWriteArrayList - 안전
List<String> list = new CopyOnWriteArrayList<>();
for (String item : list) {
    list.remove(item);  // OK
}

// ConcurrentHashMap - 약한 일관성
Map<String, String> map = new ConcurrentHashMap<>();
for (String key : map.keySet()) {
    map.remove(key);  // 일부만 제거될 수 있음
}

LinkedHashMap 동시성 버전 없음

// 동시성 구현 없음
// LinkedHashSet, LinkedHashMap

// 필요시 이렇게
Set<String> set = Collections.synchronizedSet(new LinkedHashSet<>());
Map<String, String> map = Collections.synchronizedMap(new LinkedHashMap<>());

핵심 원칙

  • 단일 스레드 → 일반 컬렉션 (가장 빠르다)
  • 멀티 스레드 + 읽기 많음 → CopyOnWrite 계열
  • 멀티 스레드 + 일반적 → Concurrent 계열 (특히 ConcurrentHashMap)
  • 생산자-소비자 → BlockingQueue 계열

실무에서 조심해야할 것

// 대부분의 경우 이것만 알아도 충분
Map<K, V> map = new ConcurrentHashMap<>();           // 99% 케이스
List<T> list = new CopyOnWriteArrayList<>();         // 리스너, 설정
BlockingQueue<T> queue = new LinkedBlockingQueue<>(); // 작업 큐

// 정렬이 필요한 경우만
Map<K, V> sortedMap = new ConcurrentSkipListMap<>();
Set<T> sortedSet = new ConcurrentSkipListSet<>();

절대 잊지 말 것

멀티스레드 환경에서 일반 컬렉션을 사용하면 해결하기 매우 어려운 버그가 발생한다

Java 동시성 컬렉션 (1) – Collections.synchronized와 한계

출처 – 김영한 님의 강의 중 김영한의 실전 자바 – 고급 1편, 멀티스레드와 동시성 중 일부