Skip to content

Commit df5b4ac

Browse files
feat: WIP fixed update channels
1 parent 55d8425 commit df5b4ac

File tree

3 files changed

+134
-97
lines changed

3 files changed

+134
-97
lines changed

internal/common/common.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,9 @@ type AssetQuote struct {
249249
Exchange Exchange
250250
Meta Meta
251251
}
252+
253+
type MessageUpdate[T any] struct {
254+
Data T
255+
ID string
256+
Sequence int64
257+
}

internal/monitor/coinbase/monitor.go

Lines changed: 59 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,16 @@ type MonitorCoinbase struct {
2626
productIdsPolling []string
2727
assetQuotesResponse []c.AssetQuote // Asset quotes filtered to the productIds set in input.productIds
2828
assetQuotesCache map[string]*c.AssetQuote // Asset quotes for all assets retrieved at least once
29-
chanStreamUpdateQuotePrice chan messageUpdate[c.QuotePrice]
30-
chanStreamUpdateQuoteExtended chan messageUpdate[c.QuoteExtended]
31-
chanStreamUpdateExchange chan messageUpdate[c.Exchange]
32-
chanPollUpdateAssetQuote chan messageUpdate[c.AssetQuote]
29+
chanStreamUpdateQuotePrice chan c.MessageUpdate[c.QuotePrice]
30+
chanStreamUpdateQuoteExtended chan c.MessageUpdate[c.QuoteExtended]
31+
chanStreamUpdateExchange chan c.MessageUpdate[c.Exchange]
32+
chanPollUpdateAssetQuote chan c.MessageUpdate[c.AssetQuote]
3333
mu sync.RWMutex
3434
ctx context.Context
3535
cancel context.CancelFunc
3636
isStarted bool
3737
}
3838

39-
type messageUpdate[T any] struct {
40-
data T
41-
productId string
42-
}
43-
4439
type input struct {
4540
productIds []string
4641
symbolsUnderlying []string
@@ -64,17 +59,22 @@ func NewMonitorCoinbase(config Config, opts ...Option) *MonitorCoinbase {
6459
monitor := &MonitorCoinbase{
6560
assetQuotesCache: make(map[string]*c.AssetQuote),
6661
assetQuotesResponse: make([]c.AssetQuote, 0),
67-
chanStreamUpdateQuotePrice: make(chan messageUpdate[c.QuotePrice]),
68-
chanStreamUpdateQuoteExtended: make(chan messageUpdate[c.QuoteExtended]),
69-
chanStreamUpdateExchange: make(chan messageUpdate[c.Exchange]),
70-
chanPollUpdateAssetQuote: make(chan messageUpdate[c.AssetQuote]),
62+
chanStreamUpdateQuotePrice: make(chan c.MessageUpdate[c.QuotePrice]),
63+
chanStreamUpdateQuoteExtended: make(chan c.MessageUpdate[c.QuoteExtended]),
64+
chanStreamUpdateExchange: make(chan c.MessageUpdate[c.Exchange]),
65+
chanPollUpdateAssetQuote: make(chan c.MessageUpdate[c.AssetQuote]),
7166
poller: poller.NewPoller(ctx, unaryAPI),
7267
unaryAPI: unaryAPI,
7368
ctx: ctx,
7469
cancel: cancel,
7570
}
7671

77-
monitor.streamer = streamer.NewStreamer(ctx, monitor.chanStreamUpdateQuotePrice, monitor.chanStreamUpdateQuoteExtended)
72+
streamerConfig := streamer.StreamerConfig{
73+
ChanStreamUpdateQuotePrice: monitor.chanStreamUpdateQuotePrice,
74+
ChanStreamUpdateQuoteExtended: monitor.chanStreamUpdateQuoteExtended,
75+
}
76+
77+
monitor.streamer = streamer.NewStreamer(ctx, streamerConfig)
7878

7979
for _, opt := range opts {
8080
opt(monitor)
@@ -141,7 +141,7 @@ func (m *MonitorCoinbase) SetSymbols(productIds []string) {
141141
m.updateAssetQuotesCache(m.assetQuotesResponse)
142142

143143
// Coinbase steaming API for CBE (spot) only and not CDE (futures)
144-
m.streamer.SetSymbolsAndUpdateSubscriptions(m.productIds) // TODO: update to return and handle error
144+
m.streamer.SetSymbolsAndUpdateSubscriptions(m.productIdsStreaming) // TODO: update to return and handle error
145145

146146
}
147147

@@ -168,6 +168,8 @@ func (m *MonitorCoinbase) Start() error {
168168
return err
169169
}
170170

171+
go m.handleUpdates()
172+
171173
m.isStarted = true
172174

173175
return nil
@@ -191,7 +193,7 @@ func (m *MonitorCoinbase) updateAssetQuotesCache(assetQuotes []c.AssetQuote) {
191193
}
192194

193195
func isStreamingProductId(productId string) bool {
194-
return !strings.HasSuffix(productId, "-CDE")
196+
return !strings.HasSuffix(productId, "-CDE") && !strings.HasPrefix(productId, "CDE")
195197
}
196198

197199
func partitionProductIds(productIds []string) (productIdsStreaming []string, productIdsPolling []string) {
@@ -217,49 +219,47 @@ func mergeAndDeduplicateProductIds(symbolsA, symbolsB []string) []string {
217219
return slices.Compact(merged)
218220
}
219221

220-
func (m *MonitorCoinbase) handleStreamPriceUpdates() {
221-
go func() {
222-
for {
223-
select {
224-
case updateMessage := <-m.chanStreamUpdateQuotePrice:
225-
226-
var assetQuote *c.AssetQuote
227-
var exists bool
228-
229-
// Check if cache exists and values have changed before acquiring write lock
230-
m.mu.RLock()
231-
defer m.mu.RUnlock()
232-
233-
assetQuote, exists = m.assetQuotesCache[updateMessage.productId]
234-
235-
if !exists {
236-
// If product id does not exist in cache, skip update
237-
// TODO: log product not found in cache - should not happen
238-
continue
239-
}
240-
241-
// Skip update if price has not changed
242-
if assetQuote.QuotePrice.Price == updateMessage.data.Price {
243-
continue
244-
}
245-
m.mu.RUnlock()
246-
247-
// Price is different so update cache
248-
m.mu.Lock()
249-
defer m.mu.Unlock()
250-
251-
assetQuote.QuotePrice.Price = updateMessage.data.Price
252-
assetQuote.QuotePrice.Change = updateMessage.data.Change
253-
assetQuote.QuotePrice.ChangePercent = updateMessage.data.ChangePercent
254-
assetQuote.QuotePrice.PriceDayHigh = updateMessage.data.PriceDayHigh
255-
assetQuote.QuotePrice.PriceDayLow = updateMessage.data.PriceDayLow
256-
assetQuote.QuotePrice.PriceOpen = updateMessage.data.PriceOpen
257-
assetQuote.QuotePrice.PricePrevClose = updateMessage.data.PricePrevClose
258-
259-
case <-m.ctx.Done():
260-
return
261-
default:
222+
func (m *MonitorCoinbase) handleUpdates() {
223+
for {
224+
select {
225+
case updateMessage := <-m.chanStreamUpdateQuotePrice:
226+
227+
var assetQuote *c.AssetQuote
228+
var exists bool
229+
230+
// Check if cache exists and values have changed before acquiring write lock
231+
m.mu.RLock()
232+
defer m.mu.RUnlock()
233+
234+
assetQuote, exists = m.assetQuotesCache[updateMessage.ID]
235+
236+
if !exists {
237+
// If product id does not exist in cache, skip update
238+
// TODO: log product not found in cache - should not happen
239+
continue
262240
}
241+
242+
// Skip update if price has not changed
243+
if assetQuote.QuotePrice.Price == updateMessage.Data.Price {
244+
continue
245+
}
246+
m.mu.RUnlock()
247+
248+
// Price is different so update cache
249+
m.mu.Lock()
250+
defer m.mu.Unlock()
251+
252+
assetQuote.QuotePrice.Price = updateMessage.Data.Price
253+
assetQuote.QuotePrice.Change = updateMessage.Data.Change
254+
assetQuote.QuotePrice.ChangePercent = updateMessage.Data.ChangePercent
255+
assetQuote.QuotePrice.PriceDayHigh = updateMessage.Data.PriceDayHigh
256+
assetQuote.QuotePrice.PriceDayLow = updateMessage.Data.PriceDayLow
257+
assetQuote.QuotePrice.PriceOpen = updateMessage.Data.PriceOpen
258+
assetQuote.QuotePrice.PricePrevClose = updateMessage.Data.PricePrevClose
259+
260+
case <-m.ctx.Done():
261+
return
262+
default:
263263
}
264-
}()
264+
}
265265
}

internal/monitor/coinbase/streamer/streamer.go

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ package streamer
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"strconv"
7-
"strings"
88
"sync"
99

10-
"github.com/achannarasappa/ticker/v4/internal/common"
10+
c "github.com/achannarasappa/ticker/v4/internal/common"
1111
"github.com/gorilla/websocket"
1212
)
1313

@@ -17,7 +17,7 @@ type messageSubscription struct {
1717
Channels []string `json:"channels"`
1818
}
1919

20-
type messageQuote struct {
20+
type messagePriceTick struct {
2121
Type string `json:"type"`
2222
Sequence int64 `json:"sequence"`
2323
ProductID string `json:"product_id"`
@@ -38,26 +38,35 @@ type messageQuote struct {
3838
}
3939

4040
type Streamer struct {
41-
symbols []string
42-
conn *websocket.Conn
43-
isStarted bool
44-
url string
45-
assetQuoteChan chan common.AssetQuote
46-
subscriptionChan chan messageSubscription
47-
onUpdate func()
48-
wg sync.WaitGroup
49-
ctx context.Context
50-
cancel context.CancelFunc
41+
symbols []string
42+
conn *websocket.Conn
43+
isStarted bool
44+
url string
45+
assetQuoteChan chan c.AssetQuote
46+
subscriptionChan chan messageSubscription
47+
onUpdate func()
48+
wg sync.WaitGroup
49+
ctx context.Context
50+
cancel context.CancelFunc
51+
chanStreamUpdateQuotePrice chan c.MessageUpdate[c.QuotePrice]
52+
chanStreamUpdateQuoteExtended chan c.MessageUpdate[c.QuoteExtended]
5153
}
5254

53-
func NewStreamer(ctx context.Context, chanStreamUpdateQuotePrice chan messageUpdate[c.QuotePrice], chanStreamUpdateQuoteExtended chan messageUpdate[c.QuoteExtended]) *Streamer {
55+
type StreamerConfig struct {
56+
ChanStreamUpdateQuotePrice chan c.MessageUpdate[c.QuotePrice]
57+
ChanStreamUpdateQuoteExtended chan c.MessageUpdate[c.QuoteExtended]
58+
}
59+
60+
func NewStreamer(ctx context.Context, config StreamerConfig) *Streamer {
5461
ctx, cancel := context.WithCancel(ctx)
5562

5663
s := &Streamer{
57-
ctx: ctx,
58-
cancel: cancel,
59-
wg: sync.WaitGroup{},
60-
subscriptionChan: make(chan messageSubscription),
64+
chanStreamUpdateQuotePrice: config.ChanStreamUpdateQuotePrice,
65+
chanStreamUpdateQuoteExtended: config.ChanStreamUpdateQuoteExtended,
66+
ctx: ctx,
67+
cancel: cancel,
68+
wg: sync.WaitGroup{},
69+
subscriptionChan: make(chan messageSubscription),
6170
}
6271

6372
return s
@@ -157,14 +166,21 @@ func (s *Streamer) readStreamQuote() {
157166
case <-s.ctx.Done():
158167
return
159168
default:
160-
var quote messageQuote
161-
err := s.conn.ReadJSON(&quote)
169+
var message messagePriceTick
170+
_, messageBytes, err := s.conn.ReadMessage()
171+
if err != nil {
172+
return
173+
}
174+
out := string(messageBytes)
175+
fmt.Println("messageBytes", out)
176+
err = json.Unmarshal(messageBytes, &message)
162177
if err != nil {
163178
return
164179
}
165180

166-
// TODO: Send to correct channels
167-
s.assetQuoteChan <- transformQuoteStream(quote)
181+
qp, qe := transformPriceTick(message)
182+
s.chanStreamUpdateQuotePrice <- qp
183+
s.chanStreamUpdateQuoteExtended <- qe
168184
}
169185
}
170186
}
@@ -177,7 +193,7 @@ func (s *Streamer) writeStreamSubscription() {
177193
case <-s.ctx.Done():
178194
return
179195
case message := <-s.subscriptionChan:
180-
fmt.Println("writing subscription", message)
196+
181197
err := s.conn.WriteJSON(message)
182198
if err != nil {
183199
return
@@ -210,23 +226,38 @@ func (s *Streamer) unsubscribe() error {
210226
return nil
211227
}
212228

213-
func transformQuoteStream(quote messageQuote) common.AssetQuote {
229+
func transformPriceTick(message messagePriceTick) (qp c.MessageUpdate[c.QuotePrice], qe c.MessageUpdate[c.QuoteExtended]) {
230+
231+
price, _ := strconv.ParseFloat(message.Price, 64)
232+
priceOpen, _ := strconv.ParseFloat(message.Open24h, 64)
233+
priceDayHigh, _ := strconv.ParseFloat(message.High24h, 64)
234+
priceDayLow, _ := strconv.ParseFloat(message.Low24h, 64)
235+
change := price - priceOpen
236+
changePercent := change / priceOpen
237+
238+
qp = c.MessageUpdate[c.QuotePrice]{
239+
ID: message.ProductID,
240+
Sequence: message.Sequence,
241+
Data: c.QuotePrice{
242+
Price: price,
243+
PricePrevClose: priceOpen,
244+
PriceOpen: priceOpen,
245+
PriceDayHigh: priceDayHigh,
246+
PriceDayLow: priceDayLow,
247+
Change: change,
248+
ChangePercent: changePercent,
249+
},
250+
}
214251

215-
symbol := strings.Split(quote.ProductID, "-")[0]
216-
price, _ := strconv.ParseFloat(quote.Price, 64)
252+
volume, _ := strconv.ParseFloat(message.Volume24h, 64)
217253

218-
return common.AssetQuote{
219-
// Name: quote.ProductID,
220-
Symbol: symbol,
221-
Class: common.AssetClassCryptocurrency,
222-
Currency: common.Currency{FromCurrencyCode: "USD"},
223-
QuotePrice: common.QuotePrice{
224-
Price: price,
254+
qe = c.MessageUpdate[c.QuoteExtended]{
255+
ID: message.ProductID,
256+
Sequence: message.Sequence,
257+
Data: c.QuoteExtended{
258+
Volume: volume,
225259
},
226-
QuoteExtended: common.QuoteExtended{},
227-
QuoteFutures: common.QuoteFutures{},
228-
QuoteSource: common.QuoteSourceCoinbase,
229-
Exchange: common.Exchange{Name: "Coinbase"},
230-
Meta: common.Meta{},
231260
}
261+
262+
return qp, qe
232263
}

0 commit comments

Comments
 (0)