Skip to content

Commit 7f99982

Browse files
authored
added streaming for candle stick data (#15)
1 parent a9610a4 commit 7f99982

File tree

1 file changed

+163
-33
lines changed

1 file changed

+163
-33
lines changed

lightstreamer.go

Lines changed: 163 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ import (
66
"context"
77
"encoding/json"
88
"fmt"
9-
"github.com/cenkalti/backoff/v4"
10-
"github.com/gorilla/websocket"
11-
log "github.com/sirupsen/logrus"
129
"io"
1310
"math/rand"
1411
"net/http"
@@ -19,37 +16,43 @@ import (
1916
"strings"
2017
"sync/atomic"
2118
"time"
19+
20+
"github.com/cenkalti/backoff/v4"
21+
"github.com/gorilla/websocket"
22+
log "github.com/sirupsen/logrus"
2223
)
2324

2425
type LightStreamerConnection struct {
25-
ig *IGMarkets
26-
sessionID string
27-
serverHostname string
28-
sessionBindTime time.Time
29-
wsConn *websocket.Conn
30-
nextSubscriptionId int
31-
heartbeatTicker *time.Ticker
32-
ctx context.Context
33-
cancelFunc func()
34-
lastError error
35-
closeRequested atomic.Bool
36-
marketSubscriptions []*subscription[MarketTick]
37-
chartTickSubscriptions []*subscription[ChartTick]
38-
tradeSubscriptions []*subscription[TradeUpdate]
39-
priceTimestampsByEpic map[string]struct{ fetchTime, priceTime time.Time }
40-
subscriptionReqChan chan interface{}
26+
ig *IGMarkets
27+
sessionID string
28+
serverHostname string
29+
sessionBindTime time.Time
30+
wsConn *websocket.Conn
31+
nextSubscriptionId int
32+
heartbeatTicker *time.Ticker
33+
ctx context.Context
34+
cancelFunc func()
35+
lastError error
36+
closeRequested atomic.Bool
37+
marketSubscriptions []*subscription[MarketTick]
38+
chartTickSubscriptions []*subscription[ChartTick]
39+
chartCandleSubscriptions []*subscription[ChartCandle]
40+
tradeSubscriptions []*subscription[TradeUpdate]
41+
priceTimestampsByEpic map[string]struct{ fetchTime, priceTime time.Time }
42+
subscriptionReqChan chan interface{}
4143
}
4244

43-
type subscription[T MarketTick | ChartTick | TradeUpdate] struct {
45+
type subscription[T MarketTick | ChartTick | ChartCandle | TradeUpdate] struct {
4446
channel chan T
4547
subscriptionId string
4648
requestID string
4749
items []string
50+
scale string // Used for ChartCandle subscriptions
4851
lastStateByItem map[string]T
4952
errChan chan error
5053
}
5154

52-
type unsubscription[T MarketTick | ChartTick | TradeUpdate] struct {
55+
type unsubscription[T MarketTick | ChartTick | ChartCandle | TradeUpdate] struct {
5356
channel <-chan T
5457
errChan chan error
5558
}
@@ -95,8 +98,25 @@ type ChartCandle struct {
9598
UpdateTime time.Time `lightstreamer:"UTM,timeFromEpochMilliseconds"`
9699
LastTradedVolume float64 `lightstreamer:"LTV"`
97100
IncrementalTradingVolume float64 `lightstreamer:"TTV"`
98-
99-
// TODO - the rest?
101+
DayOpenMid float64 `lightstreamer:"DAY_OPEN_MID"`
102+
DayChangeMid float64 `lightstreamer:"DAY_NET_CHG_MID"`
103+
DayChangePercentMid float64 `lightstreamer:"DAY_PERC_CHG_MID"`
104+
DayHighMid float64 `lightstreamer:"DAY_HIGH"`
105+
DayLowMid float64 `lightstreamer:"DAY_LOW"`
106+
OfferOpen float64 `lightstreamer:"OFR_OPEN"`
107+
OfferHigh float64 `lightstreamer:"OFR_HIGH"`
108+
OfferLow float64 `lightstreamer:"OFR_LOW"`
109+
OfferClose float64 `lightstreamer:"OFR_CLOSE"`
110+
BidOpen float64 `lightstreamer:"BID_OPEN"`
111+
BidHigh float64 `lightstreamer:"BID_HIGH"`
112+
BidLow float64 `lightstreamer:"BID_LOW"`
113+
BidClose float64 `lightstreamer:"BID_CLOSE"`
114+
LastTradedPriceOpen float64 `lightstreamer:"LTP_OPEN"`
115+
LastTradedPriceHigh float64 `lightstreamer:"LTP_HIGH"`
116+
LastTradedPriceLow float64 `lightstreamer:"LTP_LOW"`
117+
LastTradedPriceClose float64 `lightstreamer:"LTP_CLOSE"`
118+
Consolidated bool `lightstreamer:"CONS_END,boolFromInt"`
119+
ConsolidatedTickCount int `lightstreamer:"CONS_TICK_COUNT,int"`
100120
}
101121

102122
type TradeUpdate struct {
@@ -172,6 +192,8 @@ const marketTickFields = "BID OFFER CHANGE CHANGE_PCT UPDATE_TIME MARKET_DELAY M
172192

173193
const tradeFields = "CONFIRMS OPU WOU"
174194

195+
const chartCandleFields = "UTM LTV TTV DAY_OPEN_MID DAY_NET_CHG_MID DAY_PERC_CHG_MID DAY_HIGH DAY_LOW OFR_OPEN OFR_HIGH OFR_LOW OFR_CLOSE BID_OPEN BID_HIGH BID_LOW BID_CLOSE LTP_OPEN LTP_HIGH LTP_LOW LTP_CLOSE CONS_END CONS_TICK_COUNT"
196+
175197
type typeMap struct {
176198
typ reflect.Type
177199
fields string
@@ -192,6 +214,10 @@ var typeMaps = [...]typeMap{
192214
typ: reflect.TypeOf(TradeUpdate{}),
193215
fields: tradeFields,
194216
},
217+
{
218+
typ: reflect.TypeOf(ChartCandle{}),
219+
fields: chartCandleFields,
220+
},
195221
}
196222

197223
func init() {
@@ -275,6 +301,13 @@ func init() {
275301
}
276302
return reflect.ValueOf(f), nil
277303
},
304+
"int": func(ls *LightStreamerConnection, epic, s string) (reflect.Value, error) {
305+
i, err := strconv.ParseInt(s, 10, 64)
306+
if err != nil {
307+
return reflect.Value{}, err
308+
}
309+
return reflect.ValueOf(int(i)), nil
310+
},
278311
}
279312

280313
for x := range typeMaps {
@@ -299,8 +332,10 @@ func init() {
299332
}
300333
} else if t.Field(i).Type == reflect.TypeOf(float64(0)) {
301334
conversionFuncs[j] = convFuncs["float64"]
335+
} else if t.Field(i).Type == reflect.TypeOf(int(0)) {
336+
conversionFuncs[j] = convFuncs["int"]
302337
} else {
303-
panic(fmt.Sprintf("unhandled field type %s", t.Field(i).Type))
338+
panic(fmt.Sprintf("unhandled field type %s for field %s", t.Field(i).Type, field))
304339
}
305340
}
306341
}
@@ -592,6 +627,36 @@ func (ls *LightStreamerConnection) writeLoop(ctx context.Context) {
592627

593628
ls.tradeSubscriptions = append(ls.tradeSubscriptions, &subReq)
594629
// Read loop should handle REQOK/SUBOK/ERROR
630+
case subscription[ChartCandle]:
631+
ls.nextSubscriptionId++
632+
requestID := rand.Int63()
633+
var items = make([]string, len(subReq.items))
634+
for i, epic := range subReq.items {
635+
items[i] = fmt.Sprintf("CHART:%s:%s", epic, subReq.scale)
636+
}
637+
subReq.requestID = fmt.Sprintf("%d", requestID)
638+
subReq.subscriptionId = fmt.Sprintf("%d", ls.nextSubscriptionId)
639+
subReq.lastStateByItem = make(map[string]ChartCandle)
640+
ctrlVals := url.Values{}
641+
ctrlVals.Set("LS_op", "add")
642+
ctrlVals.Set("LS_mode", "MERGE")
643+
ctrlVals.Set("LS_snapshot", "true")
644+
ctrlVals.Set("LS_subId", fmt.Sprintf("%s", subReq.subscriptionId))
645+
ctrlVals.Set("LS_group", strings.Join(items, " "))
646+
ctrlVals.Set("LS_schema", chartCandleFields)
647+
ctrlVals.Set("LS_reqId", fmt.Sprintf("%s", subReq.requestID))
648+
msg := []byte("control\r\n" + ctrlVals.Encode())
649+
err := ls.wsConn.WriteMessage(websocket.TextMessage, msg)
650+
if err != nil {
651+
subReq.errChan <- err
652+
if !ls.closeRequested.Load() {
653+
ls.fatalError(err)
654+
}
655+
return
656+
}
657+
658+
ls.chartCandleSubscriptions = append(ls.chartCandleSubscriptions, &subReq)
659+
// Read loop should handle REQOK/SUBOK/ERROR
595660
case unsubscription[MarketTick]:
596661
sendUnsubscribeRequest(ls, subReq, ls.marketSubscriptions)
597662
// reader will receive UNSUB
@@ -601,6 +666,9 @@ func (ls *LightStreamerConnection) writeLoop(ctx context.Context) {
601666
case unsubscription[TradeUpdate]:
602667
sendUnsubscribeRequest(ls, subReq, ls.tradeSubscriptions)
603668
// reader will receive UNSUB
669+
case unsubscription[ChartCandle]:
670+
sendUnsubscribeRequest(ls, subReq, ls.chartCandleSubscriptions)
671+
// reader will receive UNSUB
604672
default:
605673
panic(fmt.Sprintf("unknown subscription request type %T", subReqI))
606674
}
@@ -706,6 +774,31 @@ func (ls *LightStreamerConnection) handleUpdate(args []string) {
706774
return
707775
}
708776
}
777+
778+
for _, sub := range ls.chartCandleSubscriptions {
779+
if sub.subscriptionId == subID {
780+
if itemIdIndex < 0 || itemIdIndex >= len(sub.items) {
781+
log.Printf("epic index %d out of range\n", itemIdIndex)
782+
return
783+
}
784+
epic := sub.items[itemIdIndex]
785+
lastState := sub.lastStateByItem[epic]
786+
chartCandle := lastState
787+
chartCandle.Epic = epic
788+
chartCandle.RecvTime = time.Now()
789+
chartCandle.RawUpdate = args[3]
790+
chartCandle.Scale = sub.scale
791+
err = parseUpdate(ls, args[3], "Epic", reflect.ValueOf(&chartCandle))
792+
if err != nil {
793+
log.Printf("failed to parse update %s: %s\n", args[3], err)
794+
return
795+
}
796+
sub.channel <- chartCandle
797+
sub.lastStateByItem[epic] = chartCandle
798+
return
799+
}
800+
}
801+
709802
for _, sub := range ls.tradeSubscriptions {
710803
if sub.subscriptionId == subID {
711804
if itemIdIndex < 0 || itemIdIndex >= len(sub.items) {
@@ -752,6 +845,7 @@ func parseUpdate(ls *LightStreamerConnection, update string, itemIDField string,
752845
if i+offset >= len(structFields) {
753846
return fmt.Errorf("unexpected extra field '%s' in update '%s'", field, update)
754847
}
848+
755849
switch field {
756850
case "#", "$", "":
757851
// null, empty, empty respectively
@@ -782,9 +876,10 @@ func (ls *LightStreamerConnection) handleReqErr(args []string) {
782876
handleReqErr(args, ls.marketSubscriptions)
783877
handleReqErr(args, ls.chartTickSubscriptions)
784878
handleReqErr(args, ls.tradeSubscriptions)
879+
handleReqErr(args, ls.chartCandleSubscriptions)
785880
}
786881

787-
func handleReqErr[T MarketTick | ChartTick | TradeUpdate](args []string, subs []*subscription[T]) {
882+
func handleReqErr[T MarketTick | ChartTick | ChartCandle | TradeUpdate](args []string, subs []*subscription[T]) {
788883
reqID := args[1]
789884
for _, sub := range subs {
790885
if sub.requestID == reqID {
@@ -796,7 +891,7 @@ func handleReqErr[T MarketTick | ChartTick | TradeUpdate](args []string, subs []
796891
}
797892
}
798893

799-
func sendUnsubscribeRequest[T MarketTick | ChartTick | TradeUpdate](ls *LightStreamerConnection, unSubReq unsubscription[T], subs []*subscription[T]) {
894+
func sendUnsubscribeRequest[T MarketTick | ChartTick | ChartCandle | TradeUpdate](ls *LightStreamerConnection, unSubReq unsubscription[T], subs []*subscription[T]) {
800895
found := false
801896
for _, sub := range subs {
802897
if sub.channel == unSubReq.channel {
@@ -855,6 +950,14 @@ func (ls *LightStreamerConnection) handleSubOk(args []string) {
855950
break
856951
}
857952
}
953+
for _, sub := range ls.chartCandleSubscriptions {
954+
if sub.subscriptionId == subID {
955+
select {
956+
case sub.errChan <- nil:
957+
}
958+
break
959+
}
960+
}
858961
}
859962

860963
func (ls *LightStreamerConnection) handleUnsubscribe(args []string) {
@@ -867,9 +970,10 @@ func (ls *LightStreamerConnection) handleUnsubscribe(args []string) {
867970
handleUnsubscribe(subID, &ls.marketSubscriptions)
868971
handleUnsubscribe(subID, &ls.chartTickSubscriptions)
869972
handleUnsubscribe(subID, &ls.tradeSubscriptions)
973+
handleUnsubscribe(subID, &ls.chartCandleSubscriptions)
870974
}
871975

872-
func handleUnsubscribe[T MarketTick | ChartTick | TradeUpdate](subID string, subs *[]*subscription[T]) {
976+
func handleUnsubscribe[T MarketTick | ChartTick | ChartCandle | TradeUpdate](subID string, subs *[]*subscription[T]) {
873977
var foundIndex int
874978
var found bool
875979
for i, sub := range *subs {
@@ -952,6 +1056,13 @@ func (ls *LightStreamerConnection) fatalError(err error) {
9521056
return err
9531057
}
9541058
}
1059+
for _, cs := range ls.chartCandleSubscriptions {
1060+
_, err = subscribe(lsNew, lsNew.ctx, *cs)
1061+
if err != nil {
1062+
_ = lsNew.Close()
1063+
return err
1064+
}
1065+
}
9551066
ls.closeRequested.Store(true)
9561067
ls.cancelFunc()
9571068
_ = ls.wsConn.Close()
@@ -1001,16 +1112,31 @@ func (ls *LightStreamerConnection) UnsubscribeMarkets(tickChan <-chan MarketTick
10011112
return unsubscribe(ls, tickChan)
10021113
}
10031114

1004-
func makeNewSubscription[T MarketTick | ChartTick | TradeUpdate](items []string, bufferSize int) subscription[T] {
1115+
func (ls *LightStreamerConnection) SubscribeChartCandles(ctx context.Context, bufferSize int, scale string, epics ...string) (<-chan ChartCandle, error) {
1116+
if ls.closeRequested.Load() {
1117+
return nil, fmt.Errorf("cannot subscribe using a closed lightstreamer connection")
1118+
}
1119+
1120+
subReq := makeNewSubscription[ChartCandle](epics, bufferSize)
1121+
subReq.scale = scale
1122+
return subscribe(ls, ctx, subReq)
1123+
}
1124+
1125+
func (ls *LightStreamerConnection) UnsubscribeChartCandles(candleChan <-chan ChartCandle) error {
1126+
return unsubscribe(ls, candleChan)
1127+
}
1128+
1129+
func makeNewSubscription[T MarketTick | ChartTick | ChartCandle | TradeUpdate](items []string, bufferSize int) subscription[T] {
10051130
subReq := subscription[T]{
1006-
channel: make(chan T, bufferSize),
1007-
items: items,
1008-
errChan: make(chan error),
1131+
channel: make(chan T, bufferSize),
1132+
items: items,
1133+
errChan: make(chan error, 1),
1134+
lastStateByItem: make(map[string]T),
10091135
}
10101136
return subReq
10111137
}
10121138

1013-
func subscribe[T MarketTick | ChartTick | TradeUpdate](ls *LightStreamerConnection, ctx context.Context, subReq subscription[T]) (<-chan T, error) {
1139+
func subscribe[T MarketTick | ChartTick | ChartCandle | TradeUpdate](ls *LightStreamerConnection, ctx context.Context, subReq subscription[T]) (<-chan T, error) {
10141140
select {
10151141
case ls.subscriptionReqChan <- subReq:
10161142
select {
@@ -1027,7 +1153,7 @@ func subscribe[T MarketTick | ChartTick | TradeUpdate](ls *LightStreamerConnecti
10271153
}
10281154
}
10291155

1030-
func unsubscribe[T MarketTick | ChartTick | TradeUpdate](ls *LightStreamerConnection, tickChan <-chan T) error {
1156+
func unsubscribe[T MarketTick | ChartTick | ChartCandle | TradeUpdate](ls *LightStreamerConnection, tickChan <-chan T) error {
10311157
if ls.closeRequested.Load() {
10321158
return fmt.Errorf("cannot unsubscribe using a closed lightstreamer connection")
10331159
}
@@ -1050,8 +1176,12 @@ func (ls *LightStreamerConnection) Close() error {
10501176
for _, sub := range ls.chartTickSubscriptions {
10511177
close(sub.channel)
10521178
}
1179+
for _, sub := range ls.chartCandleSubscriptions {
1180+
close(sub.channel)
1181+
}
10531182
ls.chartTickSubscriptions = nil
10541183
ls.marketSubscriptions = nil
1184+
ls.chartCandleSubscriptions = nil
10551185
}()
10561186

10571187
err := ls.wsConn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, ""), time.Time{})

0 commit comments

Comments
 (0)