AI 기반 의료 채팅·인퍼런스 시스템을 위한 분산·비동기 메시지 큐 시스템
ZoomQ는 의료 AI 시스템의 확장성을 위한 분산·비동기 처리 기반을 제공하는 순수 Java 기반 메시지 큐 시스템입니다. 멀티모듈 아키텍처로 설계되어 브로커 서버, 클라이언트 라이브러리, Spring Boot 자동 설정을 포함한 완전한 메시지 전송·처리 파이프라인을 구현합니다.
프로젝트는 4개의 주요 모듈로 구성되어 있습니다:
zoomq/
├── core/ # 핵심 메시지 큐 기능 (메시지, 연결, 프로토콜, 전송 계층)
├── server/ # 브로커 서버 구현
├── client/ # Producer/Consumer 클라이언트 라이브러리
└── spring-zoomq/ # Spring Boot 자동 설정 및 통합
- TCP 기반 비동기 메시지 브로커
- 다중 큐 관리 및 라우팅
- Producer/Consumer 연결 관리
- Round-Robin 기반 메시지 디스패칭
- 가상 스레드 기반 고성능 처리
- At-Least-Once 전송 보장: ACK/NACK 메커니즘
- 자동 재시도: 지수 백오프 기반 재시도 정책
- 메시지 추적: 메시지 ID 및 상관관계 ID 기반 추적
- 타임아웃 관리: 설정 가능한 재시도 간격 및 최대 재시도 횟수
- Producer: 큐에 메시지 발행
- Consumer: 큐에서 메시지 구독 및 처리
- 연결 관리: 자동 재연결 및 연결 상태 관리
- 동시성 제어: 설정 가능한 Producer/Consumer 동시성
- 자동 설정:
@EnableAutoConfiguration기반 자동 구성 - 메시지 리스너:
@ZooListener어노테이션 기반 메시지 핸들러 - 템플릿 API:
ZooTemplate을 통한 간편한 메시지 발행 - 설정 프로퍼티: YAML 기반 설정 관리
- 언어: Java 21
- 빌드 도구: Gradle
- 의존성 관리: Maven Central, JitPack
- JSON 처리: Jackson (2.17.2)
- 로깅: SLF4J + Logback
- 프레임워크: Spring Boot 3.3.12 (spring-zoomq 모듈)
- 통신 프로토콜: TCP/IP
- 동시성: Java Virtual Threads
핵심 메시지 큐 기능을 제공합니다:
-
메시지 시스템
Message: 메시지 기본 클래스 (PUBLISH, SUBSCRIBE, ACK, NACK, ERROR, CONNECT, DISCONNECT)MessageType: 메시지 타입 열거형MessageConverter: JSON 직렬화/역직렬화MessageFactory: 메시지 생성 팩토리
-
연결 관리
Connection: 전송 계층 연결 인터페이스TcpConnectionHandler: TCP 연결 핸들러 구현ConnectionStatus: 연결 상태 관리
-
프로토콜
ZooProtocolEndpoint: 통합 프로토콜 엔드포인트ProducerToBrokerConnection: Producer → Broker 연결BrokerProducerConnection: Broker → Producer 연결ConsumerFromBrokerConnection: Consumer ← Broker 연결BrokerConsumerConnection: Broker ← Consumer 연결
-
전송 계층
AckManager: ACK 추적 및 재시도 관리AckSender: ACK 전송 인터페이스MessageDispatcher: 메시지 디스패칭
메시지 큐 브로커 서버를 구현합니다:
-
브로커 서버
ZooBrokerServer: 메인 브로커 서버 (포트 7777)BrokerTcpConnectionHandler: 브로커 TCP 연결 핸들러
-
큐 관리
ZooMessageQueue: 메시지 큐 구현 (BlockingQueue 기반)ZooMessageQueueRegistry: 큐 레지스트리AppendOnlyQueue: 추가 전용 큐 인터페이스
-
메시지 라우팅
ZooMessageQueueRouter: 큐 기반 메시지 라우팅MessageQueueRouter: 라우터 인터페이스
-
메시지 디스패칭
BrokerMessageDispatcher: Consumer로의 메시지 디스패칭BrokerMessageDispatcherRegistry: 디스패처 레지스트리RoundRobinSelectionStrategy: 라운드로빈 선택 전략
-
연결 생명주기
BrokerProducerLifecycleManager: Producer 연결 관리BrokerConsumerLifecycleManager: Consumer 연결 관리ConnectionLifecycleManager: 연결 생명주기 관리
-
인증
UserRegistry: 사용자 레지스트리
Producer/Consumer 클라이언트 라이브러리를 제공합니다:
-
Producer
Producer: Producer 인터페이스ZooProducer: Producer 구현ProducerFactory: Producer 인스턴스 생성 책임. 객체 생성 로직을 캡슐화하여 의존성 주입과 확장성을 제공ProducerManager: Producer의 생명주기 관리 (시작/정지). Factory를 통해 Producer를 생성하고 Container에 등록ProducerContainer: 생성된 Producer들을 관리하는 컨테이너. 동시성 제어 및 메시지 발행 라우팅 담당
-
Consumer
Consumer: Consumer 인터페이스ZooConsumer: Consumer 구현ConsumerFactory: Consumer 및 ConsumerContainer 인스턴스 생성 책임. 설정 기반으로 Consumer를 생성하고 Container에 등록ConsumerManager: Consumer의 생명주기 관리 (시작/정지/일시중지/재개). Factory와 Registry를 조합하여 Consumer 생성 및 등록 관리ConsumerRegistry: subscription 이름을 키로 ConsumerContainer를 저장/조회/삭제하는 레지스트리. Thread-safe한 ConcurrentHashMap 기반 저장소ConsumerContainer: 하나의 subscription에 속한 여러 Consumer들의 집합. Consumer 추가/삭제 및 일괄 제어 기능 제공
-
연결 관리
ConnectionFactory: 연결 팩토리ClientContext: 클라이언트 컨텍스트 (싱글톤)
-
인증
ZooAuthConfig: 인증 설정
Spring Boot 애플리케이션과의 통합을 제공합니다:
-
자동 설정
ZooMQAutoConfiguration: Spring Boot 자동 설정ZooMQProperties: 설정 프로퍼티 바인딩
-
메시지 리스너
@ZooListener: 메시지 리스너 어노테이션ZooListenerScanner: 리스너 스캔 및 등록ZooListenerContextHolder: 리스너 컨텍스트 홀더InboundMessageHandler: 인바운드 메시지 핸들러
-
Producer API
ZooTemplate: 메시지 발행 템플릿 (Spring의 JmsTemplate 스타일)
cd server
./gradlew run브로커 서버는 기본적으로 포트 7777에서 실행됩니다.
application.yml 파일에 다음 설정을 추가합니다:
spring:
zoomq:
host: localhost
port: 7777
username: guest
password: guest
connection:
retry:
max-retry-count: 5
min-retry-delay: 1000
max-retry-delay: 10000
retry-delay-multiplier: 1.5
consumer:
subscription:
- queue-name-1
- queue-name-2
concurrency: 1
producer:
concurrency: 1@Service
public class MessageService {
@Autowired
private ZooTemplate zooTemplate;
public void sendMessage(String queue, Object payload) {
zooTemplate.convertAndSend(queue, payload);
}
}@Component
public class MessageHandler {
@ZooListener("queue-name-1")
public void handleMessage(Object payload) {
// 메시지 처리 로직
System.out.println("Received: " + payload);
}
}- CONNECT: 연결 설정
- MESSAGE: 실제 메시지 데이터
- ACK: 수신 확인
- NACK: 수신 거부
- PAUSE: Consumer 일시 중지
- RESUME: Consumer 재개
- ERROR: 오류 메시지
{
"type": "MESSAGE",
"queue": "queue-name",
"departure": "producer-id",
"destination": "consumer-id",
"correlationId": "correlation-id",
"messageId": "message-id",
"retryCount": 0,
"order": 0,
"contentType": "application/json",
"payload": { ... }
}max-retry-count: 최대 재시도 횟수min-retry-delay: 최소 재시도 지연 시간 (ms)max-retry-delay: 최대 재시도 지연 시간 (ms)retry-delay-multiplier: 재시도 지연 시간 배수 (지수 백오프)
subscription: 구독할 큐 이름 목록concurrency: Consumer 동시성 수준
concurrency: Producer 동시성 수준
Producer와 Consumer의 생성 및 생명주기 관리를 위해 세 가지 패턴을 조합하여 사용합니다:
Factory 패턴
ProducerFactory: Producer 인스턴스 생성 책임. 연결 생성 로직을 캡슐화하여 구현체 교체 용이ConsumerFactory: Consumer 및 ConsumerContainer 생성 책임. 설정 기반으로 동시성 수준만큼 Consumer 생성- 객체 생성 로직을 인터페이스로 추상화하여 테스트 및 확장성 확보
Manager 패턴
ProducerManager: Producer의 시작/정지 등 생명주기 관리. Factory를 통해 생성하고 Container에 등록ConsumerManager: Consumer의 시작/정지/일시중지/재개 관리. Factory와 Registry를 조합하여 관리- Factory와 Registry 사이의 중재자 역할로 책임 분리
Registry 패턴
ConsumerRegistry: subscription 이름을 키로 ConsumerContainer를 저장/조회/삭제- Thread-safe한 ConcurrentHashMap 기반 저장소로 동시성 환경에서 안전한 객체 관리
- Producer는 단일 Container를 사용하므로 Registry 불필요
이러한 패턴 조합을 통해 객체 생성, 생명주기 관리, 저장소 관리를 명확히 분리하여 단일 책임 원칙(SRP)을 준수하고 확장성을 확보했습니다.
- 전송 계층: TCP 연결 및 기본 I/O
- 프로토콜 계층: 메시지 프로토콜 처리
- 애플리케이션 계층: Producer/Consumer 로직
- Java Virtual Threads를 활용한 경량 스레드 모델
- 논블로킹 I/O 기반 메시지 처리
- 큐 기반 메시지 버퍼링
- 멀티모듈 구조로 독립적인 모듈 개발 및 배포 가능
- 설정 기반 동시성 제어
- 플러그인 가능한 선택 전략 (Selection Strategy)
- Factory/Manager/Registry 패턴: 객체 생성, 생명주기 관리, 저장소 관리를 분리하여 각 책임을 명확히 구분
- Factory: 객체 생성 로직 캡슐화로 구현체 교체 용이
- Manager: Factory와 Registry를 조합하여 생명주기 관리
- Registry: Thread-safe 저장소로 객체 조회 및 관리
- ACK/NACK 기반 메시지 전송 보장
- 자동 재시도 및 타임아웃 관리
- 연결 상태 추적 및 관리
./gradlew build./gradlew test./gradlew :core:build
./gradlew :server:build
./gradlew :client:build
./gradlew :spring-zoomq:build- 데드레터 큐 (Dead Letter Queue) 지원
- 메시지 영속성 (Persistence) 지원
- 클러스터링 및 고가용성 (HA) 지원
- 메시지 우선순위 큐
- 메시지 TTL (Time To Live)
- 메트릭 및 모니터링 지원
이 프로젝트는 교육 및 연구 목적으로 개발되었습니다.
이 프로젝트는 AI 기반 의료 시스템의 분산 처리 기반을 이해하기 위한 학습 프로젝트입니다.
Note: 이 프로젝트는 의료 AI 시스템의 확장성을 위한 분산·비동기 처리 기반을 이해하기 위해 순수 Java로 구현된 메시지 큐 시스템입니다.