44package utils
55
66import (
7+ "context"
78 "errors"
89 "fmt"
910 "os"
@@ -19,7 +20,7 @@ import (
1920)
2021
2122var unmount = syscall .Unmount
22- var command = exec .Command
23+ var commandWithCtx = exec .CommandContext
2324
2425var ErrTimeoutWaitProcess = errors .New ("timeout waiting for process to end" )
2526
@@ -33,28 +34,70 @@ type MounterOptsUtils struct {
3334
3435func (su * MounterOptsUtils ) FuseMount (path string , comm string , args []string ) error {
3536 klog .Info ("-FuseMount-" )
36- klog .Infof ("FuseMount params:\n \t path: <%s>\n \t command: <%s>\n \t args: <%v>" , path , comm , args )
37- out , err := command (comm , args ... ).CombinedOutput ()
38- if err != nil {
39- klog .Warningf ("FuseMount: mount command failed: mounter=%s, args=%v, error=%v, output=%s" , comm , args , err , string (out ))
40- klog .Infof ("FuseMount: checking if path already exists and is a mountpoint: path=%s" , path )
41- if mounted , err1 := isMountpoint (path ); err1 == nil && mounted { // check if bucket already got mounted
42- klog .Infof ("bucket is already mounted using '%s' mounter" , comm )
43- return nil
37+ klog .Infof ("FuseMount: params:\n \t path: <%s>\n \t command: <%s>\n \t args: <%v>" , path , comm , args )
38+
39+ ctx , cancel := context .WithCancel (context .Background ())
40+ var mounted bool
41+ defer func () {
42+ if ! mounted {
43+ cancel ()
4444 }
45- klog .Errorf ("FuseMount: path is not mountpoint, mount failed: path=%s" , path )
46- return fmt .Errorf ("'%s' mount failed: <%v>" , comm , string (out ))
45+ }()
46+
47+ cmd := commandWithCtx (ctx , comm , args ... )
48+ err := cmd .Start ()
49+ if err != nil {
50+ klog .Errorf ("FuseMount: command start failed: mounter=%s, args=%v, error=%v" , comm , args , err )
51+ return fmt .Errorf ("FuseMount: '%s' command start failed: %v" , comm , err )
4752 }
48- klog .Infof ("mount command succeeded: mounter=%s, output=%s" , comm , string (out ))
49- if err := waitForMount (path , 10 * time .Second ); err != nil {
50- return err
53+ klog .Infof ("FuseMount: command 'start' succeeded for '%s' mounter" , comm )
54+
55+ waitCh := make (chan error , 1 )
56+ mountCh := make (chan error , 1 )
57+
58+ go func () {
59+ klog .Infof ("FuseMount: cmd.Wait() goroutine start for mounter=%s, path=%v" , comm , path )
60+ waitCh <- cmd .Wait ()
61+ klog .Infof ("FuseMount: cmd.Wait() goroutine end for mounter=%s, path=%v" , comm , path )
62+ }()
63+
64+ go func () {
65+ klog .Infof ("FuseMount: waitForMount() goroutine start for mounter=%s, path=%v" , comm , path )
66+ mountCh <- waitForMount (ctx , path , 2 * time .Second , 30 * time .Second ) // kubelet retries NodePublishVolume after 120 seconds
67+ klog .Infof ("FuseMount: waitForMount() goroutine end for mounter=%s, path=%v" , comm , path )
68+ }()
69+
70+ select {
71+ case err := <- waitCh :
72+ if err != nil {
73+ klog .Warningf ("FuseMount: command 'wait' failed: mounter=%s, args=%v, error=%v" , comm , args , err )
74+ klog .Infof ("FuseMount: checking if path already exists and is a mountpoint: path=%s" , path )
75+ if isMount , err1 := isMountpoint (path ); err1 == nil && isMount { // check if bucket already got mounted
76+ klog .Infof ("bucket is already mounted using '%s' mounter" , comm )
77+ mounted = true
78+ return nil
79+ }
80+ return fmt .Errorf ("'%s' mount failed: %v" , comm , err )
81+ }
82+ klog .Infof ("FuseMount: command 'wait' succeeded for '%s' mounter" , comm )
83+ if err := <- mountCh ; err != nil {
84+ return err
85+ }
86+
87+ case err := <- mountCh :
88+ if err != nil {
89+ klog .Errorf ("FuseMount: path is not mountpoint. Mount failed: mounter=%s, path=%s" , comm , path )
90+ return fmt .Errorf ("'%s' mount failed: %v" , comm , err )
91+ }
5192 }
93+
5294 klog .Infof ("bucket mounted successfully using '%s' mounter" , comm )
95+ mounted = true
5396 return nil
5497}
5598
5699func (su * MounterOptsUtils ) FuseUnmount (path string ) error {
57- klog .Info ("-fuseUnmount -" )
100+ klog .Info ("-FuseUnmount -" )
58101 // check if mountpoint exists
59102 isMount , checkMountErr := isMountpoint (path )
60103 if isMount || checkMountErr != nil {
@@ -127,23 +170,33 @@ func isMountpoint(pathname string) (bool, error) {
127170 return false , nil
128171}
129172
130- func waitForMount (path string , timeout time.Duration ) error {
173+ func waitForMount (ctx context.Context , path string , initialDelay , timeout time.Duration ) error {
174+ if initialDelay > 0 {
175+ time .Sleep (initialDelay )
176+ }
131177 var elapsed time.Duration
132178 attempt := 1
133179 for {
134- isMount , err := k8sMountUtils .New ("" ).IsMountPoint (path )
135- if err == nil && isMount {
136- klog .Infof ("Path is a mountpoint: pathname: %s" , path )
137- return nil
138- }
180+ select {
181+ case <- ctx .Done ():
182+ err := ctx .Err ()
183+ klog .Infof ("waitForMount: context is done, error: %v" , err )
184+ return err
185+ default :
186+ isMount , err := k8sMountUtils .New ("" ).IsMountPoint (path )
187+ if err == nil && isMount {
188+ klog .Infof ("Path is a mountpoint: pathname: %s" , path )
189+ return nil
190+ }
139191
140- klog .Infof ("Mountpoint check in progress: attempt=%d, path=%s, isMount=%v, err=%v" , attempt , path , isMount , err )
141- time .Sleep (constants .Interval )
142- elapsed += constants .Interval
143- if elapsed >= timeout {
144- return fmt .Errorf ("timeout waiting for mount. Last check response: isMount=%v, err=%v" , isMount , err )
192+ klog .Infof ("Mountpoint check in progress: attempt=%d, path=%s, isMount=%v, err=%v, timeout=%v" , attempt , path , isMount , err , timeout )
193+ time .Sleep (constants .Interval )
194+ elapsed += constants .Interval
195+ if elapsed >= timeout {
196+ return fmt .Errorf ("timeout waiting for mount. Last check response: isMount=%v, err=%v, timeout=%v" , isMount , err , constants .Timeout )
197+ }
198+ attempt ++
145199 }
146- attempt ++
147200 }
148201}
149202
0 commit comments