@@ -3,6 +3,7 @@ package resource
33import (
44 "context"
55 "fmt"
6+ "k8s.io/apimachinery/pkg/labels"
67 "math"
78 "strconv"
89 "time"
@@ -55,6 +56,9 @@ type NodeResourceManager struct {
5556 nodeLister corelisters.NodeLister
5657 nodeSynced cache.InformerSynced
5758
59+ podLister corelisters.PodLister
60+ podSynced cache.InformerSynced
61+
5862 tspLister predictionlisters.TimeSeriesPredictionLister
5963 tspSynced cache.InformerSynced
6064
@@ -71,7 +75,7 @@ type NodeResourceManager struct {
7175 tspName string
7276}
7377
74- func NewNodeResourceManager (client clientset.Interface , nodeName string , nodeResourceReserved map [string ]string , tspName string , nodeInformer coreinformers.NodeInformer ,
78+ func NewNodeResourceManager (client clientset.Interface , nodeName string , nodeResourceReserved map [string ]string , tspName string , nodeInformer coreinformers.NodeInformer , podInformer coreinformers. PodInformer ,
7579 tspInformer predictionv1.TimeSeriesPredictionInformer , stateChann chan map [string ][]common.TimeSeries ) (* NodeResourceManager , error ) {
7680 reserveCpuPercent , err := utils .ParsePercentage (nodeResourceReserved [v1 .ResourceCPU .String ()])
7781 if err != nil {
@@ -92,6 +96,8 @@ func NewNodeResourceManager(client clientset.Interface, nodeName string, nodeRes
9296 client : client ,
9397 nodeLister : nodeInformer .Lister (),
9498 nodeSynced : nodeInformer .Informer ().HasSynced ,
99+ podLister : podInformer .Lister (),
100+ podSynced : podInformer .Informer ().HasSynced ,
95101 tspLister : tspInformer .Lister (),
96102 tspSynced : tspInformer .Informer ().HasSynced ,
97103 recorder : recorder ,
@@ -117,6 +123,7 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) {
117123 stop ,
118124 o .tspSynced ,
119125 o .nodeSynced ,
126+ o .podSynced ,
120127 ) {
121128 return
122129 }
@@ -144,7 +151,11 @@ func (o *NodeResourceManager) Run(stop <-chan struct{}) {
144151}
145152
146153func (o * NodeResourceManager ) UpdateNodeResource () {
147- node := o .getNode ()
154+ node , err := o .getNode ()
155+ if err != nil {
156+ klog .ErrorS (err , "Get node failed" )
157+ return
158+ }
148159 if len (node .Status .Addresses ) == 0 {
149160 klog .Error ("Node addresses is empty" )
150161 return
@@ -168,13 +179,33 @@ func (o *NodeResourceManager) UpdateNodeResource() {
168179 }
169180}
170181
171- func (o * NodeResourceManager ) getNode () * v1.Node {
172- node , err := o .nodeLister .Get (o .nodeName )
182+ func (o * NodeResourceManager ) getNode () (* v1.Node , error ) {
183+ return o .nodeLister .Get (o .nodeName )
184+ }
185+
186+ func (o * NodeResourceManager ) getExtResourceAllocated (extResource string ) (float64 , error ) {
187+ pods , err := o .podLister .List (labels .Everything ())
173188 if err != nil {
174- klog .Errorf ("Failed to get node: %v" , err )
175- return nil
189+ return 0 , err
190+ }
191+ allocated := 0.0
192+ allocatedFromContainer := func (container * v1.Container ) float64 {
193+ return float64 (container .Resources .Requests .Name (v1 .ResourceName (extResource ), resource .BinarySI ).Value ())
194+ }
195+ for _ , pod := range pods {
196+ if pod .Status .Phase != v1 .PodRunning {
197+ continue
198+ }
199+ var one = 0.0
200+ for _ , container := range pod .Spec .Containers {
201+ one += allocatedFromContainer (& container )
202+ }
203+ for _ , container := range pod .Spec .Containers {
204+ one = math .Max (one , allocatedFromContainer (& container ))
205+ }
206+ allocated += one
176207 }
177- return node
208+ return allocated , nil
178209}
179210
180211func (o * NodeResourceManager ) FindTargetNode (tsp * predictionapi.TimeSeriesPrediction , addresses []v1.NodeAddress ) (bool , error ) {
@@ -238,11 +269,15 @@ func (o *NodeResourceManager) BuildNodeStatus(node *v1.Node) map[string]int64 {
238269 default :
239270 continue
240271 }
241- if nextRecommendation < 0 {
242- nextRecommendation = 0
272+ extResourceName := fmt .Sprintf (utils .ExtResourcePrefixFormat , string (resourceName ))
273+ extResourceAllocated , err := o .getExtResourceAllocated (extResourceName )
274+ if err != nil {
275+ klog .Warningf ("Get allocated ext resources %s failed: %s" , extResourceName , err .Error ())
276+ }
277+ if nextRecommendation < extResourceAllocated {
278+ nextRecommendation = extResourceAllocated
243279 }
244280 metrics .UpdateNodeResourceRecommendedValue (metrics .SubComponentNodeResource , metrics .StepGetExtResourceRecommended , string (resourceName ), resourceFrom , nextRecommendation )
245- extResourceName := fmt .Sprintf (utils .ExtResourcePrefixFormat , string (resourceName ))
246281 resValue , exists := node .Status .Capacity [v1 .ResourceName (extResourceName )]
247282 if exists && resValue .Value () != 0 &&
248283 math .Abs (float64 (resValue .Value ())-
0 commit comments