Skip to content

Commit 53349d0

Browse files
committed
Add download all and all namespaces flags
1 parent 18da0e9 commit 53349d0

File tree

2 files changed

+450
-25
lines changed

2 files changed

+450
-25
lines changed

cli/cmd/capture/download.go

Lines changed: 250 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
package capture
55

66
import (
7+
"archive/tar"
78
"bytes"
9+
"compress/gzip"
810
"context"
911
"errors"
1012
"fmt"
@@ -25,6 +27,7 @@ import (
2527
"github.com/spf13/cobra"
2628
"github.com/spf13/viper"
2729
"go.uber.org/zap"
30+
batchv1 "k8s.io/api/batch/v1"
2831
corev1 "k8s.io/api/core/v1"
2932
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3033
"k8s.io/apimachinery/pkg/util/rand"
@@ -69,6 +72,8 @@ var (
6972
ErrNoBlobsFound = errors.New("no blobs found with prefix")
7073
captureName string
7174
outputPath string
75+
downloadAll bool
76+
downloadAllNamespaces bool
7277
)
7378

7479
var (
@@ -198,6 +203,12 @@ var downloadExample = templates.Examples(i18n.T(`
198203
# Download the capture file(s) created using the capture name and define output location
199204
kubectl retina capture download --name <capture-name> -o <output-location>
200205
206+
# Download all available captures
207+
kubectl retina capture download --all
208+
209+
# Download all available captures from all namespaces
210+
kubectl retina capture download --all --all-namespaces
211+
201212
# Download capture file(s) from Blob Storage via Blob URL (Blob URL requires Read/List permissions)
202213
kubectl retina capture download --blob-url "<blob-url>"
203214
`))
@@ -248,49 +259,61 @@ func downloadFromCluster(ctx context.Context, config *rest.Config, namespace str
248259

249260
// DownloadFile downloads a capture file from a specific node
250261
func (ds *DownloadService) DownloadFile(ctx context.Context, nodeName, hostPath, fileName, captureName string) error {
262+
content, err := ds.DownloadFileContent(ctx, nodeName, hostPath, fileName, captureName)
263+
if err != nil {
264+
return err
265+
}
266+
267+
outputFile := filepath.Join(outputPath, captureName, fileName+".tar.gz")
268+
fmt.Printf("Bytes retrieved: %d\n", len(content))
269+
270+
err = os.WriteFile(outputFile, content, 0o600)
271+
if err != nil {
272+
return errors.Join(ErrWriteFileToHost, err)
273+
}
274+
275+
fmt.Printf("File written to: %s\n", outputFile)
276+
return nil
277+
}
278+
279+
// DownloadFileContent downloads a capture file from a specific node and returns the content
280+
func (ds *DownloadService) DownloadFileContent(ctx context.Context, nodeName, hostPath, fileName, captureName string) ([]byte, error) {
251281
node, err := ds.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
252282
if err != nil {
253-
return errors.Join(ErrGetNodeInfo, err)
283+
return nil, errors.Join(ErrGetNodeInfo, err)
254284
}
255285

256286
downloadCmd, err := getDownloadCmd(node, hostPath, fileName)
257287
if err != nil {
258-
return err
288+
return nil, err
259289
}
260290

261291
fmt.Println("File to be downloaded: ", downloadCmd.SrcFilePath)
262292
downloadPod, err := ds.createDownloadPod(ctx, nodeName, hostPath, captureName, downloadCmd)
263293
if err != nil {
264-
return err
294+
return nil, err
265295
}
266296

297+
// Ensure cleanup
298+
defer func() {
299+
err := ds.kubeClient.CoreV1().Pods(ds.namespace).Delete(ctx, downloadPod.Name, metav1.DeleteOptions{})
300+
if err != nil {
301+
retinacmd.Logger.Warn("Failed to clean up debug pod", zap.String("name", downloadPod.Name), zap.Error(err))
302+
}
303+
}()
304+
267305
fileExists, err := ds.verifyFileExists(ctx, downloadPod, downloadCmd)
268306
if err != nil || !fileExists {
269-
return err
307+
return nil, err
270308
}
271309

272310
fmt.Println("Obtaining file...")
273311
fileContent, err := ds.executeFileDownload(ctx, downloadPod, downloadCmd)
274312
if err != nil {
275-
return err
276-
}
277-
278-
outputFile := filepath.Join(outputPath, captureName, fileName+".tar.gz")
279-
fmt.Printf("Bytes retrieved: %d\n", len(fileContent))
280-
281-
err = os.WriteFile(outputFile, fileContent, 0o600)
282-
if err != nil {
283-
return errors.Join(ErrWriteFileToHost, err)
313+
return nil, err
284314
}
285315

286-
fmt.Printf("File written to: %s\n", outputFile)
287-
288-
// Ensure cleanup
289-
err = ds.kubeClient.CoreV1().Pods(ds.namespace).Delete(ctx, downloadPod.Name, metav1.DeleteOptions{})
290-
if err != nil {
291-
retinacmd.Logger.Warn("Failed to clean up debug pod", zap.String("name", downloadPod.Name), zap.Error(err))
292-
}
293-
return nil
316+
return fileContent, nil
294317
}
295318

296319
func getCapturePods(ctx context.Context, kubeClient kubernetes.Interface, captureName, namespace string) (*corev1.PodList, error) {
@@ -513,6 +536,196 @@ func downloadFromBlob() error {
513536
return nil
514537
}
515538

539+
func createTarGzArchive(files map[string][]byte, outputPath string) error {
540+
// Create the output file
541+
outFile, err := os.Create(outputPath)
542+
if err != nil {
543+
return fmt.Errorf("failed to create archive file: %w", err)
544+
}
545+
defer outFile.Close()
546+
547+
// Create gzip writer
548+
gzipWriter := gzip.NewWriter(outFile)
549+
defer gzipWriter.Close()
550+
551+
// Create tar writer
552+
tarWriter := tar.NewWriter(gzipWriter)
553+
defer tarWriter.Close()
554+
555+
// Add each file to the archive
556+
for filePath, content := range files {
557+
header := &tar.Header{
558+
Name: filePath,
559+
Mode: 0600,
560+
Size: int64(len(content)),
561+
}
562+
563+
// Write header
564+
if err := tarWriter.WriteHeader(header); err != nil {
565+
return fmt.Errorf("failed to write header for %s: %w", filePath, err)
566+
}
567+
568+
// Write content
569+
if _, err := tarWriter.Write(content); err != nil {
570+
return fmt.Errorf("failed to write content for %s: %w", filePath, err)
571+
}
572+
}
573+
574+
return nil
575+
}
576+
577+
func downloadAllCaptures(ctx context.Context, config *rest.Config, namespace string) error {
578+
if downloadAllNamespaces {
579+
fmt.Println("Downloading all captures from all namespaces...")
580+
} else {
581+
fmt.Println("Downloading all captures...")
582+
}
583+
kubeClient, err := kubernetes.NewForConfig(config)
584+
if err != nil {
585+
return fmt.Errorf("failed to initialize k8s client: %w", err)
586+
}
587+
588+
// List all capture jobs with the capture app label
589+
captureJobSelector := &metav1.LabelSelector{
590+
MatchLabels: map[string]string{
591+
captureLabels.AppLabel: captureConstants.CaptureAppname,
592+
},
593+
}
594+
labelSelector, err := metav1.LabelSelectorAsSelector(captureJobSelector)
595+
if err != nil {
596+
return fmt.Errorf("failed to parse label selector: %w", err)
597+
}
598+
599+
var jobList *batchv1.JobList
600+
if downloadAllNamespaces {
601+
// Search across all namespaces
602+
jobList, err = kubeClient.BatchV1().Jobs("").List(ctx, metav1.ListOptions{
603+
LabelSelector: labelSelector.String(),
604+
})
605+
if err != nil {
606+
return fmt.Errorf("failed to list capture jobs across all namespaces: %w", err)
607+
}
608+
} else {
609+
// Search in specified namespace only
610+
jobList, err = kubeClient.BatchV1().Jobs(namespace).List(ctx, metav1.ListOptions{
611+
LabelSelector: labelSelector.String(),
612+
})
613+
if err != nil {
614+
return fmt.Errorf("failed to list capture jobs: %w", err)
615+
}
616+
}
617+
618+
if len(jobList.Items) == 0 {
619+
if downloadAllNamespaces {
620+
fmt.Printf("No captures found across all namespaces\n")
621+
} else {
622+
fmt.Printf("No captures found in namespace %s\n", namespace)
623+
}
624+
return nil
625+
}
626+
627+
// Group jobs by capture name and namespace
628+
type CaptureKey struct {
629+
Name string
630+
Namespace string
631+
}
632+
captureToJobs := make(map[CaptureKey][]batchv1.Job)
633+
for _, job := range jobList.Items {
634+
captureNameFromLabel, ok := job.Labels[captureLabels.CaptureNameLabel]
635+
if !ok {
636+
continue
637+
}
638+
key := CaptureKey{Name: captureNameFromLabel, Namespace: job.Namespace}
639+
captureToJobs[key] = append(captureToJobs[key], job)
640+
}
641+
642+
fmt.Printf("Found %d capture(s) to download\n", len(captureToJobs))
643+
644+
// We'll create download services per namespace as needed
645+
downloadServices := make(map[string]*DownloadService)
646+
647+
// Collect all files in memory
648+
allFiles := make(map[string][]byte)
649+
650+
// Download each capture
651+
for captureKey := range captureToJobs {
652+
currentCaptureName := captureKey.Name
653+
currentNamespace := captureKey.Namespace
654+
fmt.Printf("Processing capture: %s in namespace: %s\n", currentCaptureName, currentNamespace)
655+
656+
// Get or create download service for this namespace
657+
downloadService, exists := downloadServices[currentNamespace]
658+
if !exists {
659+
downloadService = NewDownloadService(kubeClient, config, currentNamespace)
660+
downloadServices[currentNamespace] = downloadService
661+
}
662+
663+
// Get pods for this capture and download files
664+
pods, err := getCapturePods(ctx, kubeClient, currentCaptureName, currentNamespace)
665+
if err != nil {
666+
fmt.Printf("Warning: Failed to get pods for capture %s in namespace %s: %v\n", currentCaptureName, currentNamespace, err)
667+
continue
668+
}
669+
670+
for i := range pods.Items {
671+
pod := pods.Items[i]
672+
if pod.Status.Phase != corev1.PodSucceeded {
673+
fmt.Printf("Warning: Pod %s is not in Succeeded phase (status: %s), skipping\n", pod.Name, pod.Status.Phase)
674+
continue
675+
}
676+
677+
nodeName := pod.Spec.NodeName
678+
hostPath, ok := pod.Annotations[captureConstants.CaptureHostPathAnnotationKey]
679+
if !ok {
680+
fmt.Printf("Warning: Cannot obtain host path from pod annotations for %s\n", pod.Name)
681+
continue
682+
}
683+
fileName, ok := pod.Annotations[captureConstants.CaptureFilenameAnnotationKey]
684+
if !ok {
685+
fmt.Printf("Warning: Cannot obtain capture file name from pod annotations for %s\n", pod.Name)
686+
continue
687+
}
688+
689+
// Download file content to memory
690+
content, err := downloadService.DownloadFileContent(ctx, nodeName, hostPath, fileName, currentCaptureName)
691+
if err != nil {
692+
fmt.Printf("Warning: Failed to download file from pod %s: %v\n", pod.Name, err)
693+
continue
694+
}
695+
696+
// Add to the archive with the path based on whether we're using all namespaces
697+
var archivePath string
698+
if downloadAllNamespaces {
699+
// Include namespace in path: namespace/captureName/fileName.tar.gz
700+
archivePath = filepath.Join(currentNamespace, currentCaptureName, fileName+".tar.gz")
701+
} else {
702+
// Original path: captureName/fileName.tar.gz
703+
archivePath = filepath.Join(currentCaptureName, fileName+".tar.gz")
704+
}
705+
allFiles[archivePath] = content
706+
fmt.Printf("Added %s (%d bytes) to archive\n", archivePath, len(content))
707+
}
708+
}
709+
710+
if len(allFiles) == 0 {
711+
fmt.Println("No capture files were successfully downloaded")
712+
return nil
713+
}
714+
715+
// Create the final archive
716+
timestamp := time.Now().Format("20060102150405")
717+
finalArchivePath := filepath.Join(outputPath, fmt.Sprintf("all-captures-%s.tar.gz", timestamp))
718+
719+
fmt.Printf("Creating final archive: %s\n", finalArchivePath)
720+
err = createTarGzArchive(allFiles, finalArchivePath)
721+
if err != nil {
722+
return fmt.Errorf("failed to create final archive: %w", err)
723+
}
724+
725+
fmt.Printf("Successfully created archive with %d files: %s\n", len(allFiles), finalArchivePath)
726+
return nil
727+
}
728+
516729
func NewDownloadSubCommand() *cobra.Command {
517730
downloadCapture := &cobra.Command{
518731
Use: "download",
@@ -534,8 +747,13 @@ func NewDownloadSubCommand() *cobra.Command {
534747
captureNamespace = "default"
535748
}
536749

537-
if captureName == "" && blobURL == "" {
538-
return errors.New("either --name or --blob-url must be specified")
750+
if captureName == "" && blobURL == "" && !downloadAll {
751+
return errors.New("either --name, --blob-url, or --all must be specified")
752+
}
753+
754+
// Validate all-namespaces flag usage
755+
if downloadAllNamespaces && !downloadAll {
756+
return errors.New("--all-namespaces flag can only be used with --all flag")
539757
}
540758

541759
if captureName != "" {
@@ -552,12 +770,21 @@ func NewDownloadSubCommand() *cobra.Command {
552770
}
553771
}
554772

773+
if downloadAll {
774+
err = downloadAllCaptures(ctx, kubeConfig, captureNamespace)
775+
if err != nil {
776+
return err
777+
}
778+
}
779+
555780
return nil
556781
},
557782
}
558783

559784
downloadCapture.Flags().StringVar(&blobURL, "blob-url", "", "Blob URL from which to download")
560785
downloadCapture.Flags().StringVar(&captureName, "name", "", "The name of a the capture")
786+
downloadCapture.Flags().BoolVar(&downloadAll, "all", false, "Download all available captures")
787+
downloadCapture.Flags().BoolVar(&downloadAllNamespaces, "all-namespaces", false, "Download captures from all namespaces (only works with --all flag)")
561788
downloadCapture.Flags().StringVarP(&outputPath, "output", "o", DefaultOutputPath, "Path to save the downloaded capture")
562789

563790
return downloadCapture

0 commit comments

Comments
 (0)