diff --git a/CHANGELOG.md b/CHANGELOG.md index 9932e5b6..a4047228 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - Upcoming changes... + +## [1.17.4] - 2024-11-08 +### Fixed +- Fix backslashes in file paths on Windows ## [1.17.3] - 2024-11-05 ### Fixed diff --git a/src/scanoss/__init__.py b/src/scanoss/__init__.py index bfe37a3a..88a5b159 100644 --- a/src/scanoss/__init__.py +++ b/src/scanoss/__init__.py @@ -22,4 +22,4 @@ THE SOFTWARE. """ -__version__ = "1.17.3" +__version__ = "1.17.4" diff --git a/src/scanoss/scanner.py b/src/scanoss/scanner.py index f052ee07..ea9b08d1 100644 --- a/src/scanoss/scanner.py +++ b/src/scanoss/scanner.py @@ -25,6 +25,7 @@ import os import sys import datetime +from typing import Any, Dict, List, Optional import importlib_resources from progress.bar import Bar @@ -490,66 +491,41 @@ def __run_scan_threaded(self, scan_started: bool, file_count: int) -> bool: success = False return success - def __finish_scan_threaded(self, file_map: dict = None) -> bool: - """ - Wait for the threaded scans to complete - :param file_map: mapping of obfuscated files back into originals - :return: True if successful, False otherwise + def __finish_scan_threaded(self, file_map: Optional[Dict[Any, Any]] = None) -> bool: + """Wait for the threaded scan to complete and process the results + + Args: + file_map: Mapping of obfuscated files back to originals + + Returns: + bool: True if successful, False otherwise + + Raises: + ValueError: If output format is invalid """ - success = True - responses = None + success: bool = True + scan_responses = None dep_responses = None if self.is_file_or_snippet_scan(): if not self.threaded_scan.complete(): # Wait for the scans to complete self.print_stderr(f'Warning: Scanning analysis ran into some trouble.') success = False self.threaded_scan.complete_bar() - responses = self.threaded_scan.responses + scan_responses = self.threaded_scan.responses if self.is_dependency_scan(): self.print_msg('Retrieving dependency data...') if not self.threaded_deps.complete(): - self.print_stderr(f'Warning: Dependency analysis ran into some trouble.') + self.print_stderr( + f'Warning: Dependency analysis ran into some trouble.' + ) success = False dep_responses = self.threaded_deps.responses - # self.print_stderr(f'Dep Data: {dep_responses}') - # TODO change to dictionary - raw_output = "{\n" - # TODO look into merging the two dictionaries. See https://favtutor.com/blogs/merge-dictionaries-python - if responses or dep_responses: - first = True - if responses: - for scan_resp in responses: - if scan_resp is not None: - for key, value in scan_resp.items(): - if file_map: # We have a map for obfuscated files. Check if we can revert it - fm = file_map.get(key) - if fm: - key = fm # Replace the obfuscated filename - if first: - raw_output += " \"%s\":%s" % (key, json.dumps(value, indent=2)) - first = False - else: - raw_output += ",\n \"%s\":%s" % (key, json.dumps(value, indent=2)) - # End for loop - if dep_responses: - dep_files = dep_responses.get("files") - if dep_files and len(dep_files) > 0: - for dep_file in dep_files: - file = dep_file.pop("file", None) - if file is not None: - if first: - raw_output += " \"%s\":[%s]" % (file, json.dumps(dep_file, indent=2)) - first = False - else: - raw_output += ",\n \"%s\":[%s]" % (file, json.dumps(dep_file, indent=2)) - # End for loop - raw_output += "\n}" - try: - raw_results = json.loads(raw_output) - except Exception as e: - raise Exception(f'ERROR: Problem decoding parsed json: {e}') - results = self.post_processor.load_results(raw_results).post_process() + raw_scan_results = self._merge_scan_results( + scan_responses, dep_responses, file_map + ) + + results = self.post_processor.load_results(raw_scan_results).post_process() if self.output_format == 'plain': self.__log_result(json.dumps(results, indent=2, sort_keys=True)) @@ -567,6 +543,42 @@ def __finish_scan_threaded(self, file_map: dict = None) -> bool: success = False return success + def _merge_scan_results( + self, + scan_responses: Optional[List], + dep_responses: Optional[Dict[str,Any]], + file_map: Optional[Dict[str, Any]], + ) -> Dict[str, Any]: + """Merge scan and dependency responses into a single dictionary""" + results: Dict[str, Any] = {} + + if scan_responses: + for response in scan_responses: + if response is not None: + if file_map: + response = self._deobfuscate_filenames(response, file_map) + results.update(response) + + dep_files = dep_responses.get("files", None) if dep_responses else None + if dep_files: + for dep_file in dep_files: + file = dep_file.pop("file", None) + if file: + results[file] = dep_file + + return results + + def _deobfuscate_filenames(self, response: dict, file_map: dict) -> dict: + """Convert obfuscated filenames back to original names""" + deobfuscated = {} + for key, value in response.items(): + deobfuscated_name = file_map.get(key, None) + if deobfuscated_name: + deobfuscated[deobfuscated_name] = value + else: + deobfuscated[key] = value + return deobfuscated + def scan_file_with_options(self, file: str, deps_file: str = None, file_map: dict = None, dep_scope: SCOPE = None, dep_scope_include: str = None, dep_scope_exclude: str = None) -> bool: """ diff --git a/src/scanoss/winnowing.py b/src/scanoss/winnowing.py index 9c21930d..4026ada6 100644 --- a/src/scanoss/winnowing.py +++ b/src/scanoss/winnowing.py @@ -29,6 +29,7 @@ """ import hashlib import pathlib +import platform import re from crc32c import crc32c @@ -307,11 +308,15 @@ def wfp_for_contents(self, file: str, bin_file: bool, contents: bytes) -> str: return '' # Print file line content_length = len(contents) - wfp_filename = repr(file).strip("'") # return a utf-8 compatible version of the filename + original_filename = file + + if platform.system() == 'Windows': + original_filename = file.replace('\\', '/') + wfp_filename = repr(original_filename).strip("'") # return a utf-8 compatible version of the filename if self.obfuscate: # hide the real size of the file and its name, but keep the suffix - wfp_filename = f'{self.ob_count}{pathlib.Path(file).suffix}' + wfp_filename = f'{self.ob_count}{pathlib.Path(original_filename).suffix}' self.ob_count = self.ob_count + 1 - self.file_map[wfp_filename] = file # Save the file name map for later (reverse lookup) + self.file_map[wfp_filename] = original_filename # Save the file name map for later (reverse lookup) wfp = 'file={0},{1},{2}\n'.format(file_md5, content_length, wfp_filename) # We don't process snippets for binaries, or other uninteresting files, or if we're requested to skip @@ -464,7 +469,7 @@ def crc8_buffer(self, buffer): crc = self.crc8_byte(crc, buffer[index]) crc ^= CRC8_MAXIM_DOW_FINAL # Bitwise OR (XOR) of crc in Maxim Dow Final return crc - + # # End of Winnowing Class #