@@ -10,25 +10,26 @@ import (
1010 v1 "k8s.io/api/core/v1"
1111 "k8s.io/klog/v2"
1212 "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
13+ "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
1314 "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/k8sclient"
1415)
1516
1617const byIdDir = "/dev/disk/by-id"
1718
18- func NewDeviceCacheForNode (ctx context.Context , period time.Duration , nodeName string ) (* DeviceCache , error ) {
19+ func NewDeviceCacheForNode (ctx context.Context , period time.Duration , nodeName string , driverName string , deviceUtils deviceutils. DeviceUtils ) (* DeviceCache , error ) {
1920 node , err := k8sclient .GetNodeWithRetry (ctx , nodeName )
2021 if err != nil {
2122 return nil , fmt .Errorf ("failed to get node %s: %w" , nodeName , err )
2223 }
2324
24- return newDeviceCacheForNode (period , node ), nil
25+ return newDeviceCacheForNode (period , node , driverName , deviceUtils ), nil
2526}
2627
27- func TestDeviceCache (period time.Duration , node * v1.Node ) * DeviceCache {
28- return newDeviceCacheForNode (period , node )
28+ func NewTestDeviceCache (period time.Duration , node * v1.Node ) * DeviceCache {
29+ return newDeviceCacheForNode (period , node , "pd.csi.storage.gke.io" , deviceutils . NewDeviceUtils () )
2930}
3031
31- func TestNodeWithVolumes (volumes []string ) * v1.Node {
32+ func NewTestNodeWithVolumes (volumes []string ) * v1.Node {
3233 volumesInUse := make ([]v1.UniqueVolumeName , len (volumes ))
3334 for i , volume := range volumes {
3435 volumesInUse [i ] = v1 .UniqueVolumeName ("kubernetes.io/csi/pd.csi.storage.gke.io^" + volume )
@@ -41,36 +42,37 @@ func TestNodeWithVolumes(volumes []string) *v1.Node {
4142 }
4243}
4344
44- func newDeviceCacheForNode (period time.Duration , node * v1.Node ) * DeviceCache {
45+ func newDeviceCacheForNode (period time.Duration , node * v1.Node , driverName string , deviceUtils deviceutils. DeviceUtils ) * DeviceCache {
4546 deviceCache := & DeviceCache {
46- volumes : make (map [string ]deviceMapping ),
47- period : period ,
48- dir : byIdDir ,
47+ symlinks : make (map [string ]deviceMapping ),
48+ period : period ,
49+ deviceUtils : deviceUtils ,
50+ dir : byIdDir ,
4951 }
5052
5153 // Look at the status.volumesInUse field. For each, take the last section
52- // of the string (after the last "/") and call AddVolume for that
54+ // of the string (after the last "/") and call AddVolume for that.
55+ // The expected format of the volume name is "kubernetes.io/csi/pd.csi.storage.gke.io^<volume-id>"
5356 for _ , volume := range node .Status .VolumesInUse {
54- klog .Infof ("Adding volume %s to cache" , string (volume ))
55- vID , err := pvNameFromVolumeID (string (volume ))
56- if err != nil {
57- klog .Warningf ("failure to retrieve name, skipping volume %q: %v" , string (volume ), err )
57+ volumeName := string (volume )
58+ tokens := strings .Split (volumeName , "^" )
59+ if len (tokens ) != 2 {
60+ klog .V (5 ).Infof ("Skipping volume %q because splitting volumeName on `^` returns %d tokens, expected 2" , volumeName , len (tokens ))
61+ continue
62+ }
63+
64+ // The first token is of the form "kubernetes.io/csi/<driver-name>" or just "<driver-name>".
65+ // We should check if it contains the driver name we are interested in.
66+ if ! strings .Contains (tokens [0 ], driverName ) {
5867 continue
5968 }
60- deviceCache .AddVolume (vID )
69+ klog .Infof ("Adding volume %s to cache" , string (volume ))
70+ deviceCache .AddVolume (tokens [1 ])
6171 }
6272
6373 return deviceCache
6474}
6575
66- func pvNameFromVolumeID (volumeID string ) (string , error ) {
67- tokens := strings .Split (volumeID , "^" )
68- if len (tokens ) != 2 {
69- return "" , fmt .Errorf ("invalid volume ID, split on `^` returns %d tokens, expected 2" , len (tokens ))
70- }
71- return tokens [1 ], nil
72- }
73-
7476// Run since it needs an infinite loop to keep itself up to date
7577func (d * DeviceCache ) Run (ctx context.Context ) {
7678 klog .Infof ("Starting device cache watcher for directory %s with period %s" , d .dir , d .period )
@@ -87,7 +89,7 @@ func (d *DeviceCache) Run(ctx context.Context) {
8789 case <- ticker .C :
8890 d .listAndUpdate ()
8991
90- klog .Infof ("Cache contents: %+v" , d .volumes )
92+ klog .Infof ("Cache contents: %+v" , d .symlinks )
9193 }
9294 }
9395}
@@ -106,21 +108,32 @@ func (d *DeviceCache) AddVolume(volumeID string) error {
106108 return fmt .Errorf ("error getting device name: %w" , err )
107109 }
108110
109- // Look at the dir for a symlink that matches the pvName
110- symlink := filepath .Join (d .dir , "google-" + deviceName )
111- klog .Infof ("Looking for symlink %s" , symlink )
112-
113- realPath , err := filepath .EvalSymlinks (symlink )
114- if err != nil {
115- klog .Warningf ("Error evaluating symlink for volume %s: %v" , volumeID , err )
116- return nil
111+ symlinks := d .deviceUtils .GetDiskByIdPaths (deviceName , "" )
112+ if len (symlinks ) == 0 {
113+ return fmt .Errorf ("no symlink paths found for volume %s" , volumeID )
117114 }
118115
119- klog .Infof ("Found real path %s for volume %s" , realPath , volumeID )
116+ d .mutex .Lock ()
117+ defer d .mutex .Unlock ()
120118
121- d .volumes [volumeID ] = deviceMapping {
122- symlink : symlink ,
123- realPath : realPath ,
119+ // We may have multiple symlinks for a given device, we should add all of them.
120+ for _ , symlink := range symlinks {
121+ realPath , err := filepath .EvalSymlinks (symlink )
122+ if err != nil {
123+ // This is not an error, as the symlink may not have been created yet.
124+ // Leave real_path empty; the periodic check will update it.
125+ klog .V (5 ).Infof ("Could not evaluate symlink %s, will retry: %v" , symlink , err )
126+ realPath = ""
127+ } else {
128+ klog .Infof ("Found real path %s for volume %s" , realPath , volumeID )
129+ }
130+ // The key is the symlink path. The value contains the evaluated
131+ // real path and the original volumeID for better logging.
132+ d .symlinks [symlink ] = deviceMapping {
133+ volumeID : volumeID ,
134+ realPath : realPath ,
135+ }
136+ klog .V (4 ).Infof ("Added volume %s to cache with symlink %s" , volumeID , symlink )
124137 }
125138
126139 return nil
@@ -129,25 +142,31 @@ func (d *DeviceCache) AddVolume(volumeID string) error {
129142// Remove the volume from the cache.
130143func (d * DeviceCache ) RemoveVolume (volumeID string ) {
131144 klog .Infof ("Removing volume %s from cache" , volumeID )
132- delete (d .volumes , volumeID )
145+ d .mutex .Lock ()
146+ defer d .mutex .Unlock ()
147+ for symlink , device := range d .symlinks {
148+ if device .volumeID == volumeID {
149+ delete (d .symlinks , symlink )
150+ }
151+ }
133152}
134153
135154func (d * DeviceCache ) listAndUpdate () {
136- for volumeID , device := range d .volumes {
155+ for symlink , device := range d .symlinks {
137156 // Evaluate the symlink
138- realPath , err := filepath .EvalSymlinks (device . symlink )
157+ realPath , err := filepath .EvalSymlinks (symlink )
139158 if err != nil {
140- klog .Warningf ("Error evaluating symlink for volume %s: %v" , volumeID , err )
159+ klog .Warningf ("Error evaluating symlink for volume %s: %v" , device . volumeID , err )
141160 continue
142161 }
143162
144163 // Check if the realPath has changed
145164 if realPath != device .realPath {
146- klog .Warningf ("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s" , volumeID , device . symlink , device .realPath , realPath )
165+ klog .Warningf ("Change in device path for volume %s (symlink: %s), previous path: %s, new path: %s" , device . volumeID , symlink , device .realPath , realPath )
147166
148167 // Update the cache with the new realPath
149168 device .realPath = realPath
150- d .volumes [ volumeID ] = device
169+ d .symlinks [ symlink ] = device
151170 }
152171 }
153172}
0 commit comments