1- using System ;
2- using System . Collections . Generic ;
3- using System . Linq ;
4- using System . Net ;
5- using LaunchDarkly . Logging ;
6- using LaunchDarkly . Sdk . Server . Interfaces ;
7- using StackExchange . Redis ;
8-
9- using static LaunchDarkly . Sdk . Server . Interfaces . DataStoreTypes ;
10-
11- namespace LaunchDarkly . Sdk . Server . Integrations
12- {
1+ using System ;
2+ using System . Collections . Generic ;
3+ using System . Linq ;
4+ using System . Net ;
5+ using LaunchDarkly . Logging ;
6+ using LaunchDarkly . Sdk . Server . Interfaces ;
7+ using StackExchange . Redis ;
8+
9+ using static LaunchDarkly . Sdk . Server . Interfaces . DataStoreTypes ;
10+
11+ namespace LaunchDarkly . Sdk . Server . Integrations
12+ {
1313 /// <summary>
1414 /// Internal implementation of the Redis data store.
15- /// </summary>
16- /// <remarks>
15+ /// </summary>
16+ /// <remarks>
1717 /// <para>
1818 /// Implementation notes:
1919 /// </para>
@@ -29,140 +29,142 @@ namespace LaunchDarkly.Sdk.Server.Integrations
2929 /// metadata".
3030 /// </item>
3131 /// <item> The special key "{prefix}:$inited" indicates that the store contains a complete data set.</item>
32- /// </list>
33- /// </remarks>
34- internal sealed class RedisDataStoreImpl : IPersistentDataStore
35- {
36- private readonly ConnectionMultiplexer _redis ;
37- private readonly string _prefix ;
38- private readonly Logger _log ;
39-
40- // This is used for unit testing only
41- internal Action _updateHook ;
42-
43- internal RedisDataStoreImpl (
44- ConfigurationOptions redisConfig ,
45- string prefix ,
46- Logger log
47- )
48- {
49- _log = log ;
50- var redisConfigCopy = redisConfig . Clone ( ) ;
51- _redis = ConnectionMultiplexer . Connect ( redisConfigCopy ) ;
52- _prefix = prefix ;
53- _log . Info ( "Using Redis data store at {0} with prefix \" {1}\" " ,
54- string . Join ( ", " , redisConfig . EndPoints . Select ( DescribeEndPoint ) ) , prefix ) ;
32+ /// </list>
33+ /// </remarks>
34+ internal sealed class RedisDataStoreImpl : IPersistentDataStore
35+ {
36+ private readonly ConnectionMultiplexer _redis ;
37+ private readonly string _prefix ;
38+ private readonly string _initedKey ;
39+ private readonly Logger _log ;
40+
41+ // This is used for unit testing only
42+ internal Action _updateHook ;
43+
44+ internal RedisDataStoreImpl (
45+ ConfigurationOptions redisConfig ,
46+ string prefix ,
47+ Logger log
48+ )
49+ {
50+ _log = log ;
51+ var redisConfigCopy = redisConfig . Clone ( ) ;
52+ _redis = ConnectionMultiplexer . Connect ( redisConfigCopy ) ;
53+ _prefix = prefix ;
54+ _initedKey = prefix + ":$inited" ;
55+ _log . Info ( "Using Redis data store at {0} with prefix \" {1}\" " ,
56+ string . Join ( ", " , redisConfig . EndPoints . Select ( DescribeEndPoint ) ) , prefix ) ;
57+ }
58+
59+ public bool Initialized ( ) =>
60+ _redis . GetDatabase ( ) . KeyExists ( _initedKey ) ;
61+
62+ public void Init ( FullDataSet < SerializedItemDescriptor > allData )
63+ {
64+ IDatabase db = _redis . GetDatabase ( ) ;
65+ ITransaction txn = db . CreateTransaction ( ) ;
66+ foreach ( var collection in allData . Data )
67+ {
68+ string key = ItemsKey ( collection . Key ) ;
69+ txn . KeyDeleteAsync ( key ) ;
70+ foreach ( var item in collection . Value . Items )
71+ {
72+ txn . HashSetAsync ( key , item . Key , item . Value . SerializedItem ) ;
73+ // Note, these methods are async because this Redis client treats all actions
74+ // in a transaction as async - they are only sent to Redis when we execute the
75+ // transaction. We don't need to await them.
76+ }
77+ }
78+ txn . StringSetAsync ( _initedKey , "" ) ;
79+ txn . Execute ( ) ;
80+ }
81+
82+ public SerializedItemDescriptor ? Get ( DataKind kind , string key )
83+ {
84+ IDatabase db = _redis . GetDatabase ( ) ;
85+ string json = db . HashGet ( ItemsKey ( kind ) , key ) ;
86+ if ( json == null )
87+ {
88+ _log . Debug ( "[get] Key: {0} not found in \" {1}\" " , key , kind . Name ) ;
89+ return null ;
90+ }
91+ return new SerializedItemDescriptor ( 0 , false , json ) ; // see implementation notes
5592 }
5693
57- public bool Initialized ( ) =>
58- _redis . GetDatabase ( ) . KeyExists ( _prefix ) ;
59-
60- public void Init ( FullDataSet < SerializedItemDescriptor > allData )
61- {
62- IDatabase db = _redis . GetDatabase ( ) ;
63- ITransaction txn = db . CreateTransaction ( ) ;
64- foreach ( var collection in allData . Data )
65- {
66- string key = ItemsKey ( collection . Key ) ;
67- txn . KeyDeleteAsync ( key ) ;
68- foreach ( var item in collection . Value . Items )
69- {
70- txn . HashSetAsync ( key , item . Key , item . Value . SerializedItem ) ;
71- // Note, these methods are async because this Redis client treats all actions
72- // in a transaction as async - they are only sent to Redis when we execute the
73- // transaction. We don't need to await them.
74- }
75- }
76- txn . StringSetAsync ( _prefix , "" ) ;
77- txn . Execute ( ) ;
78- }
79-
80- public SerializedItemDescriptor ? Get ( DataKind kind , string key )
81- {
82- IDatabase db = _redis . GetDatabase ( ) ;
83- string json = db . HashGet ( ItemsKey ( kind ) , key ) ;
84- if ( json == null )
85- {
86- _log . Debug ( "[get] Key: {0} not found in \" {1}\" " , key , kind . Name ) ;
87- return null ;
88- }
89- return new SerializedItemDescriptor ( 0 , false , json ) ; // see implementation notes
90- }
91-
92- public KeyedItems < SerializedItemDescriptor > GetAll ( DataKind kind )
93- {
94- IDatabase db = _redis . GetDatabase ( ) ;
95- HashEntry [ ] allEntries = db . HashGetAll ( ItemsKey ( kind ) ) ;
96- var result = new List < KeyValuePair < string , SerializedItemDescriptor > > ( ) ;
97- foreach ( HashEntry entry in allEntries )
98- {
99- result . Add ( new KeyValuePair < string , SerializedItemDescriptor > ( entry . Name ,
100- new SerializedItemDescriptor ( 0 , false , entry . Value ) ) ) ; // see implementation notes
101- }
102- return new KeyedItems < SerializedItemDescriptor > ( result ) ;
103- }
104-
105- public bool Upsert ( DataKind kind , string key , SerializedItemDescriptor newItem )
106- {
107- IDatabase db = _redis . GetDatabase ( ) ;
108- string baseKey = ItemsKey ( kind ) ;
109- while ( true )
110- {
111- string oldData ;
112- try
113- {
114- oldData = db . HashGet ( baseKey , key ) ;
115- }
116- catch ( RedisTimeoutException e )
117- {
118- _log . Error ( "Timeout in update when reading {0} from {1}: {2}" , key , baseKey , e . ToString ( ) ) ;
119- throw ;
120- }
121- // Here, unfortunately, we have to deserialize the old item (if any) just to find
122- // out its version number (see implementation notes).
123- var oldVersion = ( oldData is null ) ? 0 : kind . Deserialize ( oldData ) . Version ;
124- if ( oldVersion >= newItem . Version )
125- {
126- _log . Debug ( "Attempted to {0} key: {1} version: {2} with a version that is" +
127- " the same or older: {3} in \" {4}\" " ,
128- newItem . Deleted ? "delete" : "update" ,
129- key , oldVersion , newItem . Version , kind . Name ) ;
130- return false ;
131- }
132-
133- // This hook is used only in unit tests
134- _updateHook ? . Invoke ( ) ;
135-
136- // Note that transactions work a bit differently in StackExchange.Redis than in other
137- // Redis clients. The same Redis connection is shared across all threads, so it can't
138- // set a WATCH at the moment we start the transaction. Instead, it saves up all of
139- // the actions we send during the transaction, and replays them all within a MULTI
140- // when the transaction. AddCondition() is this client's way of doing a WATCH, and it
141- // can only use an equality match on the whole value (which is unfortunate since a
142- // serialized flag value could be fairly large).
143- ITransaction txn = db . CreateTransaction ( ) ;
144- txn . AddCondition ( oldData is null ? Condition . HashNotExists ( baseKey , key ) :
145- Condition . HashEqual ( baseKey , key , oldData ) ) ;
146-
147- txn . HashSetAsync ( baseKey , key , newItem . SerializedItem ) ;
148-
149- try
150- {
151- bool success = txn . Execute ( ) ;
152- if ( ! success )
153- {
154- // The watch was triggered, we should retry
155- _log . Debug ( "Concurrent modification detected, retrying" ) ;
156- continue ;
157- }
158- }
159- catch ( RedisTimeoutException e )
160- {
161- _log . Error ( "Timeout on update of {0} in {1}: {2}" , key , baseKey , e . ToString ( ) ) ;
162- throw ;
163- }
164- return true ;
165- }
94+ public KeyedItems < SerializedItemDescriptor > GetAll ( DataKind kind )
95+ {
96+ IDatabase db = _redis . GetDatabase ( ) ;
97+ HashEntry [ ] allEntries = db . HashGetAll ( ItemsKey ( kind ) ) ;
98+ var result = new List < KeyValuePair < string , SerializedItemDescriptor > > ( ) ;
99+ foreach ( HashEntry entry in allEntries )
100+ {
101+ result . Add ( new KeyValuePair < string , SerializedItemDescriptor > ( entry . Name ,
102+ new SerializedItemDescriptor ( 0 , false , entry . Value ) ) ) ; // see implementation notes
103+ }
104+ return new KeyedItems < SerializedItemDescriptor > ( result ) ;
105+ }
106+
107+ public bool Upsert ( DataKind kind , string key , SerializedItemDescriptor newItem )
108+ {
109+ IDatabase db = _redis . GetDatabase ( ) ;
110+ string baseKey = ItemsKey ( kind ) ;
111+ while ( true )
112+ {
113+ string oldData ;
114+ try
115+ {
116+ oldData = db . HashGet ( baseKey , key ) ;
117+ }
118+ catch ( RedisTimeoutException e )
119+ {
120+ _log . Error ( "Timeout in update when reading {0} from {1}: {2}" , key , baseKey , e . ToString ( ) ) ;
121+ throw ;
122+ }
123+ // Here, unfortunately, we have to deserialize the old item (if any) just to find
124+ // out its version number (see implementation notes).
125+ var oldVersion = ( oldData is null ) ? 0 : kind . Deserialize ( oldData ) . Version ;
126+ if ( oldVersion >= newItem . Version )
127+ {
128+ _log . Debug ( "Attempted to {0} key: {1} version: {2} with a version that is" +
129+ " the same or older: {3} in \" {4}\" " ,
130+ newItem . Deleted ? "delete" : "update" ,
131+ key , oldVersion , newItem . Version , kind . Name ) ;
132+ return false ;
133+ }
134+
135+ // This hook is used only in unit tests
136+ _updateHook ? . Invoke ( ) ;
137+
138+ // Note that transactions work a bit differently in StackExchange.Redis than in other
139+ // Redis clients. The same Redis connection is shared across all threads, so it can't
140+ // set a WATCH at the moment we start the transaction. Instead, it saves up all of
141+ // the actions we send during the transaction, and replays them all within a MULTI
142+ // when the transaction. AddCondition() is this client's way of doing a WATCH, and it
143+ // can only use an equality match on the whole value (which is unfortunate since a
144+ // serialized flag value could be fairly large).
145+ ITransaction txn = db . CreateTransaction ( ) ;
146+ txn . AddCondition ( oldData is null ? Condition . HashNotExists ( baseKey , key ) :
147+ Condition . HashEqual ( baseKey , key , oldData ) ) ;
148+
149+ txn . HashSetAsync ( baseKey , key , newItem . SerializedItem ) ;
150+
151+ try
152+ {
153+ bool success = txn . Execute ( ) ;
154+ if ( ! success )
155+ {
156+ // The watch was triggered, we should retry
157+ _log . Debug ( "Concurrent modification detected, retrying" ) ;
158+ continue ;
159+ }
160+ }
161+ catch ( RedisTimeoutException e )
162+ {
163+ _log . Error ( "Timeout on update of {0} in {1}: {2}" , key , baseKey , e . ToString ( ) ) ;
164+ throw ;
165+ }
166+ return true ;
167+ }
166168 }
167169
168170 public bool IsStoreAvailable ( )
@@ -177,21 +179,21 @@ public bool IsStoreAvailable()
177179 return false ;
178180 }
179181 }
180-
181- public void Dispose ( )
182- {
183- Dispose ( true ) ;
184- GC . SuppressFinalize ( this ) ;
185- }
186-
187- private void Dispose ( bool disposing )
188- {
189- if ( disposing )
190- {
191- _redis . Dispose ( ) ;
192- }
193- }
194-
182+
183+ public void Dispose ( )
184+ {
185+ Dispose ( true ) ;
186+ GC . SuppressFinalize ( this ) ;
187+ }
188+
189+ private void Dispose ( bool disposing )
190+ {
191+ if ( disposing )
192+ {
193+ _redis . Dispose ( ) ;
194+ }
195+ }
196+
195197 private string ItemsKey ( DataKind kind ) => _prefix + ":" + kind . Name ;
196198
197199 private string DescribeEndPoint ( EndPoint e )
@@ -202,5 +204,5 @@ private string DescribeEndPoint(EndPoint e)
202204 string . Format ( "{0}:{1}" , de . Host , de . Port ) :
203205 e . ToString ( ) ;
204206 }
205- }
206- }
207+ }
208+ }
0 commit comments