@@ -63,6 +63,7 @@ type Datastore interface {
6363 // InferenceModelRewrite operations
6464 RewriteSet (infModelRewrite * v1alpha2.InferenceModelRewrite )
6565 RewriteDelete (namespacedName types.NamespacedName )
66+ RewriteGet (modelName string ) * v1alpha2.InferenceModelRewriteRule
6667 RewriteGetAll () []* v1alpha2.InferenceModelRewrite
6768
6869 // PodList lists pods matching the given predicate.
@@ -77,9 +78,9 @@ type Datastore interface {
7778func NewDatastore (parentCtx context.Context , epFactory datalayer.EndpointFactory , modelServerMetricsPort int32 ) Datastore {
7879 store := & datastore {
7980 parentCtx : parentCtx ,
80- poolAndObjectivesMu : sync.RWMutex {},
81+ mu : sync.RWMutex {},
8182 objectives : make (map [string ]* v1alpha2.InferenceObjective ),
82- rewrites : make ( map [types. NamespacedName ] * v1alpha2. InferenceModelRewrite ),
83+ rewrites : NewModelRewriteStore ( ),
8384 pods : & sync.Map {},
8485 modelServerMetricsPort : modelServerMetricsPort ,
8586 epf : epFactory ,
@@ -90,13 +91,13 @@ func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory
9091type datastore struct {
9192 // parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore.
9293 parentCtx context.Context
93- // poolAndObjectivesMu is used to synchronize access to pool and the objectives map .
94- poolAndObjectivesMu sync.RWMutex
95- pool * v1.InferencePool
94+ // mu is used to synchronize access to pool, objectives, and rewrites .
95+ mu sync.RWMutex
96+ pool * v1.InferencePool
9697 // key: InferenceObjective name, value: *InferenceObjective
9798 objectives map [string ]* v1alpha2.InferenceObjective
98- // key: types.NamespacedName, value: *v1alpha2. InferenceModelRewrite
99- rewrites map [types. NamespacedName ] * v1alpha2. InferenceModelRewrite
99+ // rewrites store for InferenceModelRewrite objects.
100+ rewrites * ModelRewriteStore
100101 // key: types.NamespacedName, value: backendmetrics.PodMetrics
101102 pods * sync.Map
102103 // modelServerMetricsPort metrics port from EPP command line argument
@@ -106,11 +107,11 @@ type datastore struct {
106107}
107108
108109func (ds * datastore ) Clear () {
109- ds .poolAndObjectivesMu .Lock ()
110- defer ds .poolAndObjectivesMu .Unlock ()
110+ ds .mu .Lock ()
111+ defer ds .mu .Unlock ()
111112 ds .pool = nil
112113 ds .objectives = make (map [string ]* v1alpha2.InferenceObjective )
113- ds .rewrites = make ( map [types. NamespacedName ] * v1alpha2. InferenceModelRewrite )
114+ ds .rewrites = NewModelRewriteStore ( )
114115 // stop all pods go routines before clearing the pods map.
115116 ds .pods .Range (func (_ , v any ) bool {
116117 ds .epf .ReleaseEndpoint (v .(backendmetrics.PodMetrics ))
@@ -126,8 +127,8 @@ func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1
126127 return nil
127128 }
128129 logger := log .FromContext (ctx )
129- ds .poolAndObjectivesMu .Lock ()
130- defer ds .poolAndObjectivesMu .Unlock ()
130+ ds .mu .Lock ()
131+ defer ds .mu .Unlock ()
131132
132133 oldPool := ds .pool
133134 ds .pool = pool
@@ -148,23 +149,23 @@ func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1
148149}
149150
150151func (ds * datastore ) PoolGet () (* v1.InferencePool , error ) {
151- ds .poolAndObjectivesMu .RLock ()
152- defer ds .poolAndObjectivesMu .RUnlock ()
152+ ds .mu .RLock ()
153+ defer ds .mu .RUnlock ()
153154 if ! ds .PoolHasSynced () {
154155 return nil , errPoolNotSynced
155156 }
156157 return ds .pool , nil
157158}
158159
159160func (ds * datastore ) PoolHasSynced () bool {
160- ds .poolAndObjectivesMu .RLock ()
161- defer ds .poolAndObjectivesMu .RUnlock ()
161+ ds .mu .RLock ()
162+ defer ds .mu .RUnlock ()
162163 return ds .pool != nil
163164}
164165
165166func (ds * datastore ) PoolLabelsMatch (podLabels map [string ]string ) bool {
166- ds .poolAndObjectivesMu .RLock ()
167- defer ds .poolAndObjectivesMu .RUnlock ()
167+ ds .mu .RLock ()
168+ defer ds .mu .RUnlock ()
168169 if ds .pool == nil {
169170 return false
170171 }
@@ -173,59 +174,57 @@ func (ds *datastore) PoolLabelsMatch(podLabels map[string]string) bool {
173174 return poolSelector .Matches (podSet )
174175}
175176
177+ // /// InferenceObjective APIs ///
176178func (ds * datastore ) ObjectiveSet (infObjective * v1alpha2.InferenceObjective ) {
177- ds .poolAndObjectivesMu .Lock ()
178- defer ds .poolAndObjectivesMu .Unlock ()
179- // Set the objective.
179+ ds .mu .Lock ()
180+ defer ds .mu .Unlock ()
180181 ds .objectives [infObjective .Name ] = infObjective
181182}
182183
183184func (ds * datastore ) ObjectiveGet (objectiveName string ) * v1alpha2.InferenceObjective {
184- ds .poolAndObjectivesMu .RLock ()
185- defer ds .poolAndObjectivesMu .RUnlock ()
186- iObj , ok := ds .objectives [objectiveName ]
187- if ! ok {
188- return nil
189- }
190- return iObj
185+ ds .mu .RLock ()
186+ defer ds .mu .RUnlock ()
187+ return ds .objectives [objectiveName ]
191188}
192189
193190func (ds * datastore ) ObjectiveDelete (namespacedName types.NamespacedName ) {
194- ds .poolAndObjectivesMu .Lock ()
195- defer ds .poolAndObjectivesMu .Unlock ()
191+ ds .mu .Lock ()
192+ defer ds .mu .Unlock ()
196193 delete (ds .objectives , namespacedName .Name )
197194}
198195
199196func (ds * datastore ) ObjectiveGetAll () []* v1alpha2.InferenceObjective {
200- ds .poolAndObjectivesMu .RLock ()
201- defer ds .poolAndObjectivesMu .RUnlock ()
202- res := []* v1alpha2.InferenceObjective {}
197+ ds .mu .RLock ()
198+ defer ds .mu .RUnlock ()
199+ res := make ( []* v1alpha2.InferenceObjective , 0 , len ( ds . objectives ))
203200 for _ , v := range ds .objectives {
204201 res = append (res , v )
205202 }
206203 return res
207204}
208205
209206func (ds * datastore ) RewriteSet (infModelRewrite * v1alpha2.InferenceModelRewrite ) {
210- ds .poolAndObjectivesMu .Lock ()
211- defer ds .poolAndObjectivesMu .Unlock ()
212- ds .rewrites [types. NamespacedName { Name : infModelRewrite . Name , Namespace : infModelRewrite . Namespace }] = infModelRewrite
207+ ds .mu .Lock ()
208+ defer ds .mu .Unlock ()
209+ ds .rewrites . Set ( infModelRewrite )
213210}
214211
215212func (ds * datastore ) RewriteDelete (namespacedName types.NamespacedName ) {
216- ds .poolAndObjectivesMu .Lock ()
217- defer ds .poolAndObjectivesMu .Unlock ()
218- delete (ds .rewrites , namespacedName )
213+ ds .mu .Lock ()
214+ defer ds .mu .Unlock ()
215+ ds .rewrites .Delete (namespacedName )
216+ }
217+
218+ func (ds * datastore ) RewriteGet (modelName string ) * v1alpha2.InferenceModelRewriteRule {
219+ ds .mu .RLock ()
220+ defer ds .mu .RUnlock ()
221+ return ds .rewrites .GetRule (modelName )
219222}
220223
221224func (ds * datastore ) RewriteGetAll () []* v1alpha2.InferenceModelRewrite {
222- ds .poolAndObjectivesMu .RLock ()
223- defer ds .poolAndObjectivesMu .RUnlock ()
224- res := []* v1alpha2.InferenceModelRewrite {}
225- for _ , v := range ds .rewrites {
226- res = append (res , v )
227- }
228- return res
225+ ds .mu .RLock ()
226+ defer ds .mu .RUnlock ()
227+ return ds .rewrites .GetAll ()
229228}
230229
231230// /// Pods/endpoints APIs ///
0 commit comments