3030
3131class Delta (ScanossBase ):
3232 """
33+ Handle delta scan operations by copying files into a dedicated delta directory.
3334
35+ This class manages the creation of delta directories and copying of specified files
36+ while preserving directory structure. Files are read from an input file where each
37+ line contains a file path to copy.
3438 """
3539 def __init__ (
3640 self ,
@@ -42,7 +46,14 @@ def __init__(
4246 output : str = None ,
4347 ):
4448 """
45-
49+ Initialise the Delta instance.
50+
51+ :param debug: Enable debug logging
52+ :param trace: Enable trace logging
53+ :param quiet: Enable quiet mode (suppress non-essential output)
54+ :param filepath: Path to input file containing list of files to copy
55+ :param folder: Target delta directory path (auto-generated if not provided)
56+ :param output: Output file path for delta directory location (stdout if not provided)
4657 """
4758 super ().__init__ (debug , trace , quiet )
4859 self .filepath = filepath
@@ -78,18 +89,39 @@ def copy(self):
7889 # Skip empty lines
7990 if not source_file :
8091 continue
81- # Resolve to absolute path for validation
82- abs_source = os .path .abspath (source_file )
83- # Check if source file exists and is a file
92+
93+ # Normalise the source path to handle '..' and redundant separators
94+ normalised_source = os .path .normpath (source_file )
95+
96+ # Resolve to the absolute path for source validation
97+ abs_source = os .path .abspath (normalised_source )
98+
99+ # Check if the source file exists and is a file
84100 if not os .path .exists (abs_source ):
85101 self .print_stderr (f'WARNING: File { source_file } does not exist, skipping' )
86102 continue
87103 if not os .path .isfile (abs_source ):
88104 self .print_stderr (f'WARNING: { source_file } is not a file, skipping' )
89105 continue
106+
90107 # Copy files into delta dir
91108 try :
92- dest_path = os .path .join (folder , source_file )
109+ # Use a normalised source for destination to prevent traversal
110+ # Remove leading path separators and '..' components from destination
111+ safe_dest_path = normalised_source .lstrip (os .sep ).lstrip ('/' )
112+ while safe_dest_path .startswith ('..' ):
113+ safe_dest_path = safe_dest_path [2 :].lstrip (os .sep ).lstrip ('/' )
114+
115+ dest_path = os .path .join (folder , safe_dest_path )
116+
117+ # Final safety check: ensure destination is within delta folder
118+ abs_dest = os .path .abspath (dest_path )
119+ abs_folder = os .path .abspath (folder )
120+ if not abs_dest .startswith (abs_folder + os .sep ):
121+ self .print_stderr (f'ERROR: Destination path escapes delta directory for { source_file } ,'
122+ f' skipping' )
123+ continue
124+
93125 dest_dir = os .path .dirname (dest_path )
94126 if dest_dir :
95127 os .makedirs (dest_dir , exist_ok = True )
@@ -111,14 +143,22 @@ def delta_dir(self, folder):
111143 validates that it doesn't already exist before creating it.
112144
113145 :param folder: Optional target directory path
114- :return: Path to the delta directory, or empty string if folder already exists
146+ :return: Path to the delta directory, or empty string if folder already exists or creation fails
115147 """
116148 if folder and os .path .exists (folder ):
117149 self .print_stderr (f'Folder { folder } already exists' )
118150 return ''
119151 elif folder :
120- os .makedirs (folder , exist_ok = True )
152+ try :
153+ os .makedirs (folder )
154+ except (OSError , IOError ) as e :
155+ self .print_stderr (f'ERROR: Failed to create directory { folder } : { e } ' )
156+ return ''
121157 else :
122- folder = tempfile .mkdtemp (prefix = "delta-" , dir = '.' )
158+ try :
159+ folder = tempfile .mkdtemp (prefix = "delta-" , dir = '.' )
160+ except (OSError , IOError ) as e :
161+ self .print_stderr (f'ERROR: Failed to create temporary directory: { e } ' )
162+ return ''
123163 return folder
124164
0 commit comments