Skip to content

Commit 84c55c1

Browse files
committed
add: transport-abstraction docs
Signed-off-by: EunJiJung <[email protected]>
1 parent f39cfdc commit 84c55c1

File tree

1 file changed

+386
-0
lines changed

1 file changed

+386
-0
lines changed
Lines changed: 386 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,386 @@
1+
# Transport Abstraction Design for argocd-agent
2+
3+
## Overview
4+
5+
현재 argocd-agent는 gRPC에 종속적인 구조로 되어 있어, 다양한 네트워크 환경에서의 유연성이 제한됩니다. 이 문서는 gRPC 종속성을 제거하고 다양한 transport를 지원하는 추상화 설계를 제안합니다.
6+
7+
### Current Issues
8+
- **gRPC 종속성**: Application layer가 gRPC Protobuf에 직접 의존
9+
- **제한된 네트워크 호환성**: HTTP/2 필수, 일부 proxy 환경에서 제한
10+
- **확장성 부족**: 새로운 transport 추가 어려움
11+
12+
## Current Architecture (Before)
13+
14+
## Protocol Stack 변화
15+
16+
### Before (현재)
17+
┌─────────────────────────────────────┐
18+
│ Application Events │ ← Create, Update, Delete, Status
19+
├─────────────────────────────────────┤
20+
│ CloudEvents Format │ ← Event envelope with metadata
21+
├─────────────────────────────────────┤
22+
│ gRPC Streaming │ ← Bidirectional streams (고정)
23+
├─────────────────────────────────────┤
24+
│ HTTP/2 (or HTTP/1+WS) │ ← Transport layer (gRPC만 가능)
25+
├─────────────────────────────────────┤
26+
│ TLS + mTLS │ ← Security layer
27+
└─────────────────────────────────────┘
28+
29+
30+
### After (Transport 추상화)
31+
┌─────────────────────────────────────┐
32+
│ Application Events │ ← Create, Update, Delete, Status
33+
├─────────────────────────────────────┤
34+
│ CloudEvents Format │ ← Event envelope with metadata
35+
├─────────────────────────────────────┤
36+
│ Transport Interface Layer │ ← NEW: Connection + Stream interfaces
37+
├─────────────────────────────────────┤
38+
│ ┌─────────┬─────────┬─────────────┐ │
39+
│ │ gRPC │ HTTP │ Kafka │ │ ← Multiple transport options
40+
│ │Streaming│ SSE+POST│ Pub/Sub │ │
41+
│ └─────────┴─────────┴─────────────┘ │
42+
├─────────────────────────────────────┤
43+
│ HTTP/2+WS │ HTTP/1.1 │ TCP/Binary │ ← Transport-specific protocols
44+
├─────────────────────────────────────┤
45+
│ TLS + mTLS │ ← Security layer (공통)
46+
└─────────────────────────────────────┘
47+
48+
49+
But, principal -> agent 방향의 bidirectional 은 유지합니다. (queue를 제외하고)
50+
51+
### Before (현재)
52+
```
53+
pkg/api/grpc/eventstreamapi/
54+
├── eventstream.pb.go ← Protobuf 메시지
55+
├── eventstream_grpc.pb.go ← gRPC 서비스
56+
└── (gRPC만 지원)
57+
58+
pkg/client/
59+
└── remote.go ← gRPC client만
60+
61+
agent/
62+
├── connection.go ← gRPC 직접 사용
63+
└── agent.go ← *client.Remote 의존
64+
65+
principal/apis/eventstream/
66+
└── eventstream.go ← gRPC server 직접 구현
67+
```
68+
69+
### After (Transport 추상화)
70+
```
71+
pkg/api/eventstreamapi/
72+
├── interface.go ← Connection, Stream interfaces
73+
├── factory.go ← Transport factory
74+
├── config.go ← Config types
75+
├── grpc/ ← gRPC transport
76+
│ ├── eventstream.pb.go ← (기존 파일 이동)
77+
│ ├── eventstream_grpc.pb.go ← (기존 파일 이동)
78+
│ ├── connection.go ← GRPCConnection 구현
79+
│ ├── stream.go ← GRPCStream 구현
80+
│ └── websocket.go ← WebSocket 지원
81+
# we can 확장..
82+
├── http/ ← HTTP transport (NEW)
83+
│ ├── connection.go ← HTTPConnection 구현
84+
│ ├── stream.go ← HTTPStream 구현
85+
│ └── server.go ← HTTP server
86+
└── kafka/ ← Kafka transport (NEW)
87+
├── connection.go ← KafkaConnection 구현
88+
├── stream.go ← KafkaStream 구현
89+
└── server.go ← Kafka server
90+
91+
pkg/client/
92+
└── remote.go ← (기존 유지, gRPC transport에서 래핑)
93+
94+
agent/
95+
├── connection.go ← eventstreamapi.Connection 사용
96+
└── agent.go ← eventstreamapi.Connection 의존
97+
98+
principal/apis/eventstream/
99+
└── eventstream.go ← eventstreamapi.Stream 사용
100+
```
101+
102+
### 4. 각 Transport별 구현
103+
- **gRPC**: 기존 pkg/client/remote.go 래핑
104+
- **HTTP**: SSE(수신) + POST(송신) 조합
105+
- **Kafka**: Producer(송신) + Consumer(수신) 조합
106+
107+
## 핵심 변화점
108+
109+
### 1. Interface Layer 추가
110+
```
111+
// pkg/api/eventstreamapi/interface.go
112+
type Connection interface {
113+
Connect(ctx context.Context) error
114+
CreateStream(ctx context.Context) (Stream, error)
115+
Close() error
116+
}
117+
118+
type Stream interface {
119+
Send(*cloudevents.Event) error
120+
Receive() (*cloudevents.Event, error)
121+
Close() error
122+
}
123+
```
124+
125+
### 2. Transport Factory
126+
```
127+
// pkg/api/eventstreamapi/factory.go
128+
func NewFactory() *Factory
129+
func (f *Factory) Create(config Config) Connection
130+
131+
// 사용법
132+
factory := eventstreamapi.NewFactory()
133+
conn := factory.Create(eventstreamapi.Config{
134+
Type: "grpc", // "grpc", "http", "kafka"
135+
Endpoint: "...",
136+
})
137+
```
138+
139+
### 3. Application Layer 변화
140+
```
141+
// Before
142+
type Agent struct {
143+
remote *client.Remote // gRPC 직접 의존
144+
}
145+
146+
// After
147+
type Agent struct {
148+
connection eventstreamapi.Connection // Transport 무관
149+
}
150+
```
151+
152+
### 사용 중인 Events 들 (managed, autonomous)
153+
154+
- **/internal/event/event.go**: 모든 이벤트 타입 상수 정의
155+
- **/principal/event.go**: Principal의 이벤트 처리 로직 (모드별 필터링)
156+
- **/agent/outbound.go**: Agent의 이벤트 생성 로직 (모드별 타입 선택)
157+
- **/principal/apis/eventstream/eventstream.go**: 이벤트 스트림 처리 및 필터링
158+
159+
ping, pong, send 등..
160+
161+
## Principal ↔ Agent 통신 흐름 (Before/After)
162+
163+
#### 1단계: 연결 설정 (Connection Establishment)
164+
165+
#### Before (현재 gRPC 종속)
166+
```go
167+
// Agent 측
168+
// /agent/connection.go
169+
func (a *Agent) maintainConnection() error {
170+
if !a.IsConnected() {
171+
// gRPC 직접 연결
172+
err = a.remote.Connect(a.context, false) // *client.Remote (gRPC)
173+
if err == nil {
174+
a.SetConnected(true)
175+
}
176+
}
177+
}
178+
179+
// Principal 측
180+
// /principal/apis/eventstream/eventstream.go
181+
func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) error {
182+
// gRPC 서버가 직접 스트림 수신
183+
c, err := s.newClientConnection(subs.Context(), s.options.MaxStreamDuration)
184+
}
185+
```
186+
187+
#### After (Transport 추상화)
188+
```go
189+
// Agent 측
190+
// /agent/connection.go
191+
func (a *Agent) maintainConnection() error {
192+
if !a.IsConnected() {
193+
// Transport-agnostic 연결
194+
err = a.connection.Connect(a.context) // eventstreamapi.Connection
195+
if err == nil {
196+
a.SetConnected(true)
197+
}
198+
}
199+
}
200+
201+
// Principal 측
202+
// /principal/apis/eventstream/eventstream.go
203+
// stream data format이 변경되었습니다.
204+
func (s *Server) Subscribe(stream eventstreamapi.Stream) error {
205+
// Transport-agnostic 스트림 수신
206+
c, err := s.newClientConnection(stream.Context(), s.options.MaxStreamDuration)
207+
}
208+
```
209+
210+
### 사용하는 Stream 객체의 변화
211+
212+
213+
### 2단계: 스트림 생성 (Stream Creation)
214+
215+
#### Before (gRPC 스트림)
216+
```go
217+
// Agent 측
218+
// /agent/connection.go
219+
func (a *Agent) handleStreamEvents() error {
220+
conn := a.remote.Conn() // gRPC connection
221+
client := eventstreamapi.NewEventStreamClient(conn) // gRPC client
222+
stream, err := client.Subscribe(a.context) // gRPC bidirectional stream
223+
224+
a.eventWriter = event.NewEventWriter(stream) // gRPC stream 직접 사용
225+
}
226+
```
227+
228+
#### After (Stream 인터페이스)
229+
```go
230+
// Agent 측
231+
func (a *Agent) handleStreamEvents() error {
232+
stream, err := a.connection.CreateStream(a.context) // eventstreamapi.Stream
233+
234+
a.eventWriter = event.NewEventWriter(stream) // Stream interface 사용
235+
}
236+
```
237+
238+
### 3단계: 이벤트 송신 (Event Sending)
239+
240+
#### Before (Protobuf 변환 + gRPC)
241+
```go
242+
// Agent → Principal
243+
// /internal/event/event.go
244+
func (ew *EventWriter) sendEvent(resID string) {
245+
// CloudEvent → Protobuf 변환 (Application layer에서)
246+
pev, err := format.ToProto(eventMsg.event) // cloudevents.Event → pb.CloudEvent
247+
248+
// gRPC 스트림으로 전송
249+
err = ew.target.Send(&eventstreamapi.Event{Event: pev}) // gRPC-specific
250+
}
251+
252+
// streamWriter interface (gRPC 종속)
253+
type streamWriter interface {
254+
Send(*eventstreamapi.Event) error // Protobuf 메시지
255+
Context() context.Context
256+
}
257+
```
258+
259+
#### After (CloudEvent 직접 전송)
260+
```go
261+
// Agent → Principal
262+
func (ew *EventWriter) sendEvent(resID string) {
263+
// CloudEvent 직접 전송 (Protobuf 변환 제거)
264+
err := ew.target.Send(eventMsg.event) // cloudevents.Event 직접 사용
265+
}
266+
267+
// streamWriter interface (Transport-agnostic)
268+
type streamWriter interface {
269+
Send(*cloudevents.Event) error // CloudEvent 직접
270+
Context() context.Context
271+
}
272+
273+
// Transport별 Protobuf 변환은 내부에서 처리
274+
// pkg/api/eventstreamapi/grpc/stream.go
275+
func (s *GRPCStream) Send(event *cloudevents.Event) error {
276+
pev, err := format.ToProto(event) // gRPC transport에서만 변환
277+
return s.grpcStream.Send(&eventstreamapi.Event{Event: pev})
278+
}
279+
```
280+
281+
### 4단계: 이벤트 수신 (Event Receiving)
282+
283+
#### Before (gRPC + Protobuf 역변환)
284+
```go
285+
// Principal ← Agent
286+
// /principal/apis/eventstream/eventstream.go
287+
func (s *Server) recvFunc(c *client, subs eventstreamapi.EventStream_SubscribeServer) error {
288+
// gRPC에서 Protobuf 수신
289+
streamEvent, err := subs.Recv() // *eventstreamapi.Event (Protobuf)
290+
291+
// Protobuf → CloudEvent 변환 (Application layer에서)
292+
incomingEvent, err := format.FromProto(streamEvent.Event) // pb.CloudEvent → cloudevents.Event
293+
294+
// 수신 큐에 추가
295+
q.Add(incomingEvent)
296+
}
297+
```
298+
299+
#### After (CloudEvent 직접 수신)
300+
```go
301+
// Principal ← Agent
302+
func (s *Server) recvFunc(c *client, stream eventstreamapi.Stream) error {
303+
// CloudEvent 직접 수신 (Protobuf 변환 제거)
304+
incomingEvent, err := stream.Receive() // cloudevents.Event 직접 반환
305+
306+
// 수신 큐에 추가
307+
q.Add(incomingEvent)
308+
}
309+
310+
// Transport별 Protobuf 변환은 내부에서 처리
311+
// pkg/api/eventstreamapi/grpc/stream.go
312+
func (s *GRPCStream) Receive() (*cloudevents.Event, error) {
313+
streamEvent, err := s.grpcStream.Recv() // gRPC에서 수신
314+
return format.FromProto(streamEvent.Event), err // gRPC transport에서만 변환
315+
}
316+
```
317+
318+
### 5단계: 양방향 통신 루프 (Bidirectional Communication Loop)
319+
320+
#### Before (gRPC 고정)
321+
```go
322+
// Agent 측 - 송신/수신 루프
323+
// /agent/connection.go
324+
go func() {
325+
for a.IsConnected() {
326+
err = a.sender(stream) // gRPC stream 직접 사용
327+
err = a.receiver(stream) // gRPC stream 직접 사용
328+
}
329+
}()
330+
331+
// Principal 측 - 송신/수신 루프
332+
// /principal/connection.go
333+
go func() {
334+
for {
335+
err := s.recvFunc(c, subs) // gRPC stream 직접 사용
336+
err := s.sendFunc(c, subs) // gRPC stream 직접 사용
337+
}
338+
}()
339+
```
340+
341+
#### After (Transport 무관)
342+
```go
343+
// Agent 측 - 송신/수신 루프
344+
// /agent/connection.go
345+
go func() {
346+
for a.IsConnected() {
347+
err = a.sender(stream) // eventstreamapi.Stream interface
348+
err = a.receiver(stream) // eventstreamapi.Stream interface
349+
}
350+
}()
351+
352+
// Principal 측 - 송신/수신 루프
353+
// /principal/connection.go
354+
go func() {
355+
for {
356+
err := s.recvFunc(c, stream) // eventstreamapi.Stream interface
357+
err := s.sendFunc(c, stream) // eventstreamapi.Stream interface
358+
}
359+
}()
360+
```
361+
362+
## 핵심 변화 요약
363+
364+
**Before**: Application Layer → gRPC Protobuf → gRPC Stream → Network
365+
**After**: Application Layer → CloudEvent → Transport Interface → Network
366+
367+
1. **Protobuf 변환 위치**: Application layer → Transport layer로 이동
368+
2. **인터페이스 추상화**: gRPC-specific → Transport-agnostic interfaces
369+
3. **코드 재사용**: 기존 gRPC 로직 완전 보존하면서 새 transport 추가
370+
4. **설정 기반 선택**: `--transport-type` 플래그로 runtime 선택
371+
372+
### CLI Usage
373+
```bash
374+
# Current
375+
argocd-agent agent --enable-websocket=true
376+
377+
# After
378+
argocd-agent agent --transport-type=grpc # Default
379+
argocd-agent agent --transport-type=grpc-ws # WebSocket fallback
380+
argocd-agent agent --transport-type=http # HTTP transport
381+
argocd-agent agent --transport-type=kafka # Message router
382+
```
383+
384+
## Conclusion
385+
386+
양방향 실시간이 왜 필요한걸까..

0 commit comments

Comments
 (0)