Kafka를 활용하여 마이크로서비스 아키텍처(MSA) 기반의 회원 가입 및 이메일 발송 시스템을 구축하는 방법을 알아보자. Kafka의 메시지 브로커 역할을 통해 서비스 간 비동기 통신을 구현하며, 이벤트 기반 아키텍처의 핵심 개념을 실습한다
프로젝트 목표
- MSA 구조에서 Kafka의 실제 활용 방법 이해
- 이벤트 기반 아키텍처 패턴 실습
- Producer-Consumer 모델 구현
- 장애 처리 및 재시도 메커니즘 구현
시스템 아키텍처
전체 구조
[사용자] → [User Service] → [Kafka Cluster] → [Email Service]
↓ ↓
[User DB] [Email DB]
서비스 구성
| 서비스 | 역할 | 데이터베이스 | 포트 |
| User Service | 회원 가입 처리 및 Kafka 메시지 발생(Producer) | User DB (H2) | 8080 |
| Email Service | 이메일 발송 처리 및 Kafka 메시지 소비(Consumer) | Email DB (H2) | 8081 |
구현 기능
User Service
- 회원 가입 API 제공
- 사용자 정보를 User DB에 저장
- 회원 가입 완료 이벤트를 Kafka에 발행
Email Service
- Kafka로부터 회원 가입 이벤트 수신
- 환영 이메일 발송(시뮬레이션)
- 이메일 발송 로그를 Email DB에 저장
User Service 구현
프로젝트 초기 설정 – 필수 의존성
- Spring Boot DevTools
- Spring Web
- Spring for Apache Kafka
- H2 Database
- Spring Data JPA
애플리케이션 설정
server:
port: 8080
spring:
kafka:
bootstrap-servers:
- {KAFKA_SERVER_IP}:9092
- {KAFKA_SERVER_IP}:19092
- {KAFKA_SERVER_IP}:29092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
partitioner.class: org.apache.kafka.clients.producer.RoundRobinPartitioner
h2:
console:
enabled: true
datasource:
url: jdbc:h2:mem:userDB
driver-class-name: org.h2.Driver
username: sa
password:
주요 설정 항목
- bootstrap-servers: Kafka 클러스터의 브로커 주소 목록 (고가용성 확보)
- key-serializer / value-serializer: 메시지를 문자열로 직렬화
- partitioner.class: RoundRobinPartitioner를 사용하여 메시지를 파티션에 균등 분배
도메인 모델 구현
User Entity
@Entity
@Table(name = "users")
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String email;
private String name;
private String password;
public User() {
}
public User(String email, String name, String password) {
this.email = email;
this.name = name;
this.password = password;
}
// Getters
public Long getId() {
return id;
}
public String getEmail() {
return email;
}
public String getName() {
return name;
}
public String getPassword() {
return password;
}
}
User Repository
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
}
이벤트 모델 정의
UserSignedUpEvent
public record UserSignedUpEvent(
Long userId,
String email,
String name
) {
}
설계 원칙: 패스워드와 같은 민감 정보나 다른 서비스에서 불필요한 데이터는 이벤트에 포함하지 않는다. 이는 보안과 네트워크 효율성을 고려한 설계이다
컨트롤러 구현
SignUpRequestDto
public record SignUpRequestDto(
String email,
String name,
String password
) {
}
UserController
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserService userService;
public UserController(UserService userService) {
this.userService = userService;
}
@PostMapping
public ResponseEntity<String> signUp(
@RequestBody SignUpRequestDto signUpRequestDto
) {
userService.signUp(signUpRequestDto);
return ResponseEntity.ok("회원가입 성공");
}
}
비즈니스 로직 및 Kafka Producer 구현
UserService
@Service
public class UserService {
private final UserRepository userRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
public UserService(UserRepository userRepository, KafkaTemplate<String, String> kafkaTemplate) {
this.userRepository = userRepository;
this.kafkaTemplate = kafkaTemplate;
}
public void signUp(SignUpRequestDto signUpRequestDto) {
// 1. 사용자 정보 저장
User user = new User(
signUpRequestDto.email(),
signUpRequestDto.name(),
signUpRequestDto.password()
);
User savedUser = userRepository.save(user);
// 2. 이벤트 객체 생성
UserSignedUpEvent userSignedUpEvent = new UserSignedUpEvent(
savedUser.getId(),
savedUser.getEmail(),
savedUser.getName()
);
// 3. Kafka에 이벤트 발행
this.kafkaTemplate.send("user.signed-up", toJsonString(userSignedUpEvent));
}
private String toJsonString(Object object) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON 직렬화 실패", e);
}
}
}
핵심 로직
- 사용자 정보를 데이터베이스에 저장한다
- 저장된 사용자 정보로 이벤트 객체를 생성한다
- 이벤트를 JSON 문자열로 변환하여 Kafka 토픽에 발행한다
Email Service 구현
프로젝트 초기 설정
User Service와 동일한 의존성을 사용한다
애플리케이션 설정
server:
port: 8081 # User Service와 포트 충돌 방지
spring:
kafka:
bootstrap-servers:
- {KAFKA_SERVER_IP}:9092
- {KAFKA_SERVER_IP}:19092
- {KAFKA_SERVER_IP}:29092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
h2:
console:
enabled: true
datasource:
url: jdbc:h2:mem:emailDB
driver-class-name: org.h2.Driver
username: sa
password:
주요 설정 항목
- key-serializer / value-serializer: 수신한 메시지를 문자열로 역직렬화
- auto-offset-reset: earliest: 컨슈머 그룹이 처음 시작할 때 가장 오래된 메시지부터 처리
이벤트 모델 정의
UserSignedUpEvent
public class UserSignedUpEvent {
private Long userId;
private String email;
private String name;
// 역직렬화를 위한 기본 생성자 필수
public UserSignedUpEvent() {
}
public UserSignedUpEvent(Long userId, String email, String name) {
this.userId = userId;
this.email = email;
this.name = name;
}
// JSON 문자열을 객체로 변환하는 정적 팩토리 메서드
public static UserSignedUpEvent fromJson(String json) {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(json, UserSignedUpEvent.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON 파싱 실패", e);
}
}
// Getters
public Long getUserId() {
return userId;
}
public String getEmail() {
return email;
}
public String getName() {
return name;
}
}
- 설계 참고: Record 타입은 기본 생성자를 제공하지 않으므로 역직렬화가 필요한 경우 일반 클래스를 사용한다
도메인 모델 구현
EmailLog Entity
@Entity
@Table(name = "email_logs")
public class EmailLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long receiverUserId; // 수신자 사용자 ID
private String receiverEmail; // 수신자 이메일
private String subject; // 이메일 제목
public EmailLog() {
}
public EmailLog(Long receiverUserId, String receiverEmail, String subject) {
this.receiverUserId = receiverUserId;
this.receiverEmail = receiverEmail;
this.subject = subject;
}
// Getters
public Long getId() {
return id;
}
public Long getReceiverUserId() {
return receiverUserId;
}
public String getReceiverEmail() {
return receiverEmail;
}
public String getSubject() {
return subject;
}
}
EmailLogRepository
@Repository
public interface EmailLogRepository extends JpaRepository<EmailLog, Long> {
}
Kafka Consumer 구현
UserSignupEventConsumer
@Service
public class UserSignupEventConsumer {
private final EmailLogRepository emailLogRepository;
public UserSignupEventConsumer(EmailLogRepository emailLogRepository) {
this.emailLogRepository = emailLogRepository;
}
@KafkaListener(
topics = "user.signed-up",
groupId = "email-service",
concurrency = "3" // 3개의 스레드로 병렬 처리
)
@RetryableTopic(
attempts = "5", // 최대 5번 재시도
backoff = @Backoff(delay = 1000, multiplier = 2), // 지수 백오프: 1초, 2초, 4초, 8초, 16초
dltTopicSuffix = ".dlt" // Dead Letter Topic 접미사
)
public void consume(String message) throws InterruptedException {
// 1. JSON 메시지를 객체로 변환
UserSignedUpEvent userSignedUpEvent = UserSignedUpEvent.fromJson(message);
// 2. 이메일 발송 (실제 구현은 생략, 시뮬레이션)
String receiverEmail = userSignedUpEvent.getEmail();
String subject = userSignedUpEvent.getName() + "님, 회원 가입을 축하드립니다!";
Thread.sleep(3000); // 이메일 발송 시간 시뮬레이션 (3초)
System.out.println("이메일 발송 완료");
// 3. 이메일 발송 로그 저장
EmailLog emailLog = new EmailLog(
userSignedUpEvent.getUserId(),
receiverEmail,
subject
);
emailLogRepository.save(emailLog);
}
}
주요 기능
- @KafkaListener: Kafka 토픽으로부터 메시지를 수신
- concurrency = “3”: 멀티쓰레드를 활용한 병렬 처리로 처리량 향상
- @RetryableTopic: 메시지 처리 실패 시 자동 재시도
- 지수 백오프 전략: 재시도 간격을 점진적으로 증가
- 최종 실패 시 DLT(Dead Letter Topic)로 전송
Dead Letter Topic(DLT) 처리
UserSignedUpEventDltConsumer
@Service
public class UserSignedUpEventDltConsumer {
@KafkaListener(
topics = "user.signed-up.dlt",
groupId = "email-service"
)
public void consume(String message) {
// 로그 시스템에 전송
System.out.println("로그 시스템에 전송: " + message);
// 알림 발송 (Slack, Discord, Telegram 등)
System.out.println("알림 발송: " + message);
}
}
DLT 처리 전략
- 모든 재시도가 실패한 메시지를 별도 토픽에 저장
- 운영팀에게 알림을 전송하여 수동 개입 유도
- 실무에서를 로그수집 시스템 (ELK Stack 등)과 연동하여 모니터링
Kafka 클러스터 설정
Kafka 서버 실행
백그라운드에서 3대의 Kafka 서버를 실행한다
bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-start.sh -daemon config/server2.properties bin/kafka-server-start.sh -daemon config/server3.properties
실행 상태 확인
lsof -i:9092 lsof -i:19092 lsof -i:29092
기존 토픽 정리
테스트를 위해 기존 토픽을 삭제한다
# 토픽 목록 조회 bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # 토픽 삭제 bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic email.send
토픽 생성
메인 토픽 생성 (user.signed-up)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create \ --topic user.signed-up \ --partitions 3 \ --replication-factor 3
설정 근거
- partitions 3 – 병렬 처리량 향상 (Consumer의 concurrency)
- replication-factor: 3 – 고가용성 확보 (데이터 손실 방지)
토픽 상세 정보 확인
bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --describe \ --topic user.signed-up
예상 출력
Topic: user.signed-up TopicId: tJ28bBqYSoiEh0GcmHmjVg PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: user.signed-up Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: user.signed-up Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: user.signed-up Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
DLT 토픽 생성 (user.signed-up.dlt)
bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create \ --topic user.signed-up.dlt \ --partitions 1 \ --replication-factor 3
설정 근거
- partitions 1 – DLT 메시지는 빠른 처리가 불필요하므로 단일 파티션 사용
- replication-factor: 3 – 장애 메시지 손실 방지를 위한 복제 유지
통합 테스트
서버 실행
- Kafka 클러스터 (이미 실행 중)
- User Service (포트 8080)
- Email Service (포트 8081)
회원 가입 API 호출
POST http://localhost:8080/api/users
Content-Type: application/json
{
"email": "sender@test.com",
"name": "김이름",
"password": "1234"
}
응답
회원가입 성공
Email Service 콘솔 출력
이메일 발송 완료
데이터 검증
User DB 확인 (localhost:8080/h2-console)
SELECT * FROM users;
결과
id | email | name | password 1 | sender@test.com | 김이름 | 1234
Email DB 확인 (localhost:8081/h2-console)
id | receiver_user_id | receiver_email | subject 1 | 1 | sender@test.com | 김이름님, 회원 가입을 축하드립니다!
결과
id | receiver_user_id | receiver_email | subject 1 | 1 | sender@test.com | 김이름님, 회원 가입을 축하드립니다!
검증 결과
- User Service가 회원 정보를 User DB에 저장
- User Service가 Kafka에 이벤트 발행
- Email Service가 Kafka로부터 이벤트 수신
- Email Service가 이메일 발송 처리 (시뮬레이션)
- Email Service가 발송 로그를 Email DB에 저장
결론
- Kafka 기본 개념: Producer, Consumer, Topic, Partition의 실제 동작 방식
- MSA 패턴: 서비스 간 비동기 통신 구현
- 이벤트 기반 아키텍처: 느슨한 결합과 확장성의 이점
- 장애 처리: 재시도 메커니즘과 DLT를 통한 복구 전략
Kafka는 현대 MSA 환경에서 서비스 간 통신의 핵심 인프라로 자리잡았다. Kafka의 강력한 메시지 브로커 기능과 Spring의 편리한 추상화를 결합하면, 안정적으로 확장 가능한 분산 시스템을 효율적으로 구축할 수 있다