@@ -44,6 +44,7 @@ def __init__(
4444 filepath : str = None ,
4545 folder : str = None ,
4646 output : str = None ,
47+ root_dir : str = '.' ,
4748 ):
4849 """
4950 Initialise the Delta instance.
@@ -59,8 +60,9 @@ def __init__(
5960 self .filepath = filepath
6061 self .folder = folder
6162 self .output = output
63+ self .root_dir = root_dir if root_dir else '.'
6264
63- def copy (self ):
65+ def copy (self , input_file : str = None ):
6466 """
6567 Copy files listed in the input file to the delta directory.
6668
@@ -71,94 +73,119 @@ def copy(self):
7173 :return: Tuple of (status_code, folder_path) where status_code is 0 for success,
7274 1 for error, and folder_path is the delta directory path
7375 """
74- # Validate that input file exists
75- if not os . path . exists ( self . filepath ) :
76- self .print_stderr (f 'ERROR: Input file { self . filepath } does not exist ' )
76+ input_file = input_file if input_file else self . filepath
77+ if not input_file :
78+ self .print_stderr ('ERROR: No input file specified ' )
7779 return 1 , ''
78- # Create delta dir (folder)
79- folder = self .delta_dir (self .folder )
80- if not folder :
81- self .print_stderr (f'ERROR: Input folder { self .folder } already exists' )
80+ # Validate that an input file exists
81+ if not os .path .exists (input_file ):
82+ self .print_stderr (f'ERROR: Input file { input_file } does not exist' )
8283 return 1 , ''
83- self .print_to_file_or_stdout (folder , self .output )
84- # Read files from filepath
85- try :
86- with open (self .filepath , 'r' , encoding = 'utf-8' ) as f :
87- for line in f :
88- source_file = line .rstrip ('\n \r ' )
89- # Skip empty lines
90- if not source_file :
91- continue
92-
93- # Normalise the source path to handle '..' and redundant separators
94- normalised_source = os .path .normpath (source_file )
95-
96- # Resolve to the absolute path for source validation
97- abs_source = os .path .abspath (normalised_source )
98-
99- # Check if the source file exists and is a file
100- if not os .path .exists (abs_source ):
101- self .print_stderr (f'WARNING: File { source_file } does not exist, skipping' )
102- continue
103- if not os .path .isfile (abs_source ):
104- self .print_stderr (f'WARNING: { source_file } is not a file, skipping' )
105- continue
106-
107- # Copy files into delta dir
108- try :
109- # Use a normalised source for destination to prevent traversal
110- # Remove leading path separators and '..' components from destination
111- safe_dest_path = normalised_source .lstrip (os .sep ).lstrip ('/' )
112- while safe_dest_path .startswith ('..' ):
113- safe_dest_path = safe_dest_path [2 :].lstrip (os .sep ).lstrip ('/' )
114-
115- dest_path = os .path .join (folder , safe_dest_path )
116-
117- # Final safety check: ensure destination is within delta folder
118- abs_dest = os .path .abspath (dest_path )
119- abs_folder = os .path .abspath (folder )
120- if not abs_dest .startswith (abs_folder + os .sep ):
121- self .print_stderr (f'ERROR: Destination path escapes delta directory for { source_file } ,'
122- f' skipping' )
123- continue
124-
125- dest_dir = os .path .dirname (dest_path )
126- if dest_dir :
127- os .makedirs (dest_dir , exist_ok = True )
128- shutil .copy (abs_source , dest_path )
129- except (OSError , shutil .Error ) as copy_err :
130- self .print_stderr (f'ERROR: Failed to copy { source_file } : { copy_err } ' )
131- continue
132- except (OSError , IOError ) as read_err :
133- self .print_stderr (f'ERROR: Failed to read input file: { read_err } ' )
84+ # Load the input file and validate it contains valid file paths
85+ files = self .load_input_file (input_file )
86+ if files is None :
13487 return 1 , ''
135- return 0 , folder
136-
137- def delta_dir (self , folder ):
88+ # Create delta dir (folder)
89+ delta_folder = self .create_delta_dir (self .folder , self .root_dir )
90+ if not delta_folder :
91+ return 1 , ''
92+ # Print delta folder location to output
93+ self .print_to_file_or_stdout (delta_folder , self .output )
94+ # Process each file and copy it to the delta dir
95+ for source_file in files :
96+ # Normalise the source path to handle ".." and redundant separators
97+ normalised_source = os .path .normpath (source_file )
98+ if normalised_source .startswith ('..' ):
99+ self .print_stderr (f'WARNING: Source path escapes root directory for { source_file } skipping' )
100+ continue
101+ # Resolve to the absolute path for source validation
102+ abs_source = os .path .abspath (os .path .join (self .root_dir , normalised_source ))
103+ # Check if the source file exists and is a file
104+ if not os .path .exists (abs_source ) or not os .path .isfile (abs_source ):
105+ self .print_stderr (f'WARNING: File { source_file } does not exist or is not a file, skipping' )
106+ continue
107+ # Use a normalised source for destination to prevent traversal
108+ dest_path = os .path .normpath (os .path .join (self .root_dir , delta_folder , normalised_source .lstrip (os .sep )))
109+ # Final safety check: ensure destination is within the delta folder
110+ abs_dest = os .path .abspath (dest_path )
111+ abs_folder = os .path .abspath (delta_folder )
112+ if not abs_dest .startswith (abs_folder + os .sep ):
113+ self .print_stderr (
114+ f'WARNING: Destination path ({ abs_dest } ) escapes delta directory for { source_file } skipping' )
115+ continue
116+ # Create the destination directory if it doesn't exist and copy the file
117+ try :
118+ dest_dir = os .path .dirname (dest_path )
119+ if dest_dir :
120+ self .print_trace (f'Creating directory { dest_dir } ...' )
121+ os .makedirs (dest_dir , exist_ok = True )
122+ self .print_debug (f'Copying { source_file } to { dest_path } ...' )
123+ shutil .copy (abs_source , dest_path )
124+ except (OSError , shutil .Error ) as e :
125+ self .print_stderr (f'ERROR: Failed to copy { source_file } to { dest_path } : { e } ' )
126+ return 1 , ''
127+ return 0 , delta_folder
128+
129+ def create_delta_dir (self , folder : str , root_dir : str = '.' ) -> str or None :
138130 """
139- Create or validate the delta directory.
131+ Create the delta directory.
140132
141133 If no folder is specified, creates a unique temporary directory with
142134 a 'delta-' prefix in the current directory. If a folder is specified,
143135 validates that it doesn't already exist before creating it.
144136
145- :param folder: Optional target directory path
146- :return: Path to the delta directory, or empty string if folder already exists or creation fails
137+ :param root_dir: Root directory to create the delta directory in (default: current directory)
138+ :param folder: Optional target directory
139+ :return: Path to the delta directory, or None if it already exists or creation fails
147140 """
148- if folder and os .path .exists (folder ):
149- self .print_stderr (f'Folder { folder } already exists' )
150- return ''
151- elif folder :
141+ if folder :
142+ # Validate the target directory doesn't already exist and create it
143+ if os .path .exists (folder ):
144+ self .print_stderr (f'Error: Folder { folder } already exists.' )
145+ return None
146+ else :
147+ try :
148+ self .print_debug (f'Creating delta directory { folder } ...' )
149+ os .makedirs (folder )
150+ except (OSError , IOError ) as e :
151+ self .print_stderr (f'ERROR: Failed to create directory { folder } : { e } ' )
152+ return None
153+ else :
154+ # Create a unique temporary directory in the given root directory
152155 try :
153- os .makedirs (folder )
156+ self .print_debug (f'Creating temporary delta directory in { root_dir } ...' )
157+ folder = tempfile .mkdtemp (prefix = "delta-" , dir = root_dir )
158+ self .print_debug (f'Created temporary delta directory: { folder } ' )
154159 except (OSError , IOError ) as e :
155- self .print_stderr (f'ERROR: Failed to create directory { folder } : { e } ' )
156- return ''
157- else :
160+ self .print_stderr (f'ERROR: Failed to create temporary directory in { root_dir } : { e } ' )
161+ return None
162+ return os .path .normpath (folder )
163+
164+ def load_input_file (self , input_file : str ) -> list [str ] or None :
165+ """
166+ Loads and parses the input file line by line. Each line in the input
167+ file represents a source file path, which will be stripped of trailing
168+ whitespace and appended to the resulting list if it is not empty.
169+
170+ :param input_file: The path to the input file to be read.
171+ :type input_file: String
172+ :return: A list of source file paths extracted from the input file,
173+ or None if an error occurs or the file path is invalid.
174+ :rtype: An array list[str] or None
175+ """
176+ files = []
177+ if input_file :
158178 try :
159- folder = tempfile .mkdtemp (prefix = "delta-" , dir = '.' )
179+ with open (input_file , 'r' , encoding = 'utf-8' ) as f :
180+ for line in f :
181+ source_file = line .rstrip ()
182+ if source_file :
183+ files .append (source_file .lstrip (os .sep )) # Save the file path without any leading separators
184+ # End of for loop
160185 except (OSError , IOError ) as e :
161- self .print_stderr (f'ERROR: Failed to create temporary directory: { e } ' )
162- return ''
163- return folder
186+ self .print_stderr (f'ERROR: Failed to read input file; { input_file } : { e } ' )
187+ return None
188+ self .print_debug (f'Loaded { len (files )} files from input file.' )
189+ return files
190+
164191
0 commit comments