From 4dfffe8b1273d574baf2fbd8bee944f7b70cfc3e Mon Sep 17 00:00:00 2001 From: Reid Orsten Date: Fri, 18 Apr 2025 08:32:30 -0600 Subject: [PATCH 1/4] CallableChain/CallableRegistry to help serialize parsers, validators, transformers --- pyproject.toml | 2 +- src/cli_wrapper/__init__.py | 4 + src/cli_wrapper/cli_wrapper.py | 166 ++++++++++--------- src/cli_wrapper/parsers.py | 73 ++------ src/cli_wrapper/{util.py => transformers.py} | 11 +- src/cli_wrapper/util/__init__.py | 0 src/cli_wrapper/util/callable_chain.py | 51 ++++++ src/cli_wrapper/util/callable_registry.py | 92 ++++++++++ src/cli_wrapper/validators.py | 54 ++++++ tests/test_cli_wrapper.py | 60 +++++-- tests/test_parsers.py | 82 ++++++++- tests/test_serialization.py | 34 ++++ tests/test_validators.py | 30 ++++ 13 files changed, 500 insertions(+), 159 deletions(-) rename src/cli_wrapper/{util.py => transformers.py} (58%) create mode 100644 src/cli_wrapper/util/__init__.py create mode 100644 src/cli_wrapper/util/callable_chain.py create mode 100644 src/cli_wrapper/util/callable_registry.py create mode 100644 src/cli_wrapper/validators.py create mode 100644 tests/test_serialization.py create mode 100644 tests/test_validators.py diff --git a/pyproject.toml b/pyproject.toml index 29294c3..ecb4394 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ requires = ["setuptools>=61.0", "build", "setuptools-git-versioning"] build-backend = "setuptools.build_meta" [project.optional-dependencies] -test = ["pytest", "pytest-cov", "pytest-asyncio", "pytest-xdist", "ruamel.yaml", "dotted_dict"] +test = ["pytest", "pytest-cov", "pytest-asyncio", "pytest-xdist", "ruamel.yaml", "dotted_dict", "black", "pylint"] [tool.setuptools.packages.find] where = ["src"] diff --git a/src/cli_wrapper/__init__.py b/src/cli_wrapper/__init__.py index 9cd26f9..d032bbc 100644 --- a/src/cli_wrapper/__init__.py +++ b/src/cli_wrapper/__init__.py @@ -1 +1,5 @@ from .cli_wrapper import CLIWrapper +from .transformers import transformers +from .parsers import parsers + +__all__ = ["CLIWrapper", "transformers", "parsers"] diff --git a/src/cli_wrapper/cli_wrapper.py b/src/cli_wrapper/cli_wrapper.py index 486731c..4f8f9a9 100644 --- a/src/cli_wrapper/cli_wrapper.py +++ b/src/cli_wrapper/cli_wrapper.py @@ -50,14 +50,13 @@ def validate_pod_name(name): import logging import os import subprocess -from copy import deepcopy from itertools import chain -from typing import Callable -from attrs import define +from attrs import define, field -from cli_wrapper.parsers import Parser -from cli_wrapper.util import snake2kebab +from .parsers import Parser +from .transformers import transformers +from .validators import validators, Validator logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -71,22 +70,11 @@ class Argument: literal_name: str | None = None default: str = None - validator: Callable[[str], bool | str] = None - transformer: Callable[[str, str], tuple[str, str]] = None - - def __attrs_post_init__(self): - if self.validator is None: - self.validator = lambda x: True - if not callable(self.validator): - raise ValueError("Validator is not callable") - if self.transformer is None: - self.transformer = lambda name, value: (name, value) - # TODO: support parser-style transformers (e.g. turn kubectl.create(dict) -> kubectl.create(filename=tmpfile(dict)) - # if isinstance(self.transformer, str): - # self.transformer = Parser(self.transformer) + validator: Validator | str | dict | list[str | dict] = field(converter=Validator, default=None) + transformer: str = "snake2kebab" @classmethod - def _from_dict(cls, arg_dict): + def from_dict(cls, arg_dict): """ Create an Argument from a dictionary :param arg_dict: the dictionary to be converted @@ -104,10 +92,11 @@ def _to_dict(self): Convert the Argument to a dictionary :return: the dictionary representation of the Argument """ + logger.debug(f"Converting argument {self.literal_name} to dict") return { "literal_name": self.literal_name, "default": self.default, - "validator": self.validator, + "validator": self.validator._to_dict() if self.validator is not None else None, } def is_valid(self, value): @@ -116,7 +105,8 @@ def is_valid(self, value): :param value: the value to be validated :return: True if valid, False otherwise """ - return self.validator(value) if self.validator is not None else True + logger.debug(f"Validating {self.literal_name} with value {value}") + return validators.get(self.validator)(value) if self.validator is not None else True def transform(self, name, value, **kwargs): """ @@ -125,7 +115,27 @@ def transform(self, name, value, **kwargs): :param value: the value to be transformed :return: the transformed value """ - return self.transformer(name, value, **kwargs) + return ( + transformers.get(self.transformer)(name, value, **kwargs) if self.transformer is not None else (name, value) + ) + + +def arg_converter(value: dict): + """ + Convert the value of the argument to a string + :param value: the value to be converted + :return: the converted value + """ + value = value.copy() + for k, v in value.items(): + if isinstance(v, dict): + if "literal_name" not in v: + v["literal_name"] = k + value[k] = Argument.from_dict(v) + if isinstance(v, Argument): + if v.literal_name is None: + v.literal_name = k + return value @define @@ -136,76 +146,75 @@ class Command(object): cli_command: str default_flags: dict = {} - args: dict[str | int, Argument] = {} - parse: Callable[[str], any] = None - default_transformer: Callable[[str, str], tuple[str, str]] = snake2kebab - arg_separator: str = "=" - - def __attrs_post_init__(self): - if not callable(self.parse): - logger.debug("Parse is not callable") - self.parse = Parser(self.parse) + args: dict[str | int, any] = field(factory=dict, converter=arg_converter) + parse: Parser = field(converter=Parser, default=None) + default_transformer: str = "snake2kebab" + short_prefix: str = field(repr=False, default="-") + long_prefix: str = field(repr=False, default="--") + arg_separator: str = field(repr=False, default="=") @classmethod - def _from_dict(cls, command_dict): + def _from_dict(cls, command_dict, **kwargs): """ Create a Command from a dictionary :param command_dict: the dictionary to be converted :return: Command object """ - parse = command_dict.get("parse", None) - if parse is not None: - parse = Parser(parse) + command_dict = command_dict.copy() + if "args" in command_dict: + for k, v in command_dict["args"].items(): + if "literal_name" not in v: + v["literal_name"] = k + if "cli_command" not in command_dict: + command_dict["cli_command"] = kwargs.pop("cli_command", None) return Command( - cli_command=command_dict.get("cli_command", None), - default_flags=command_dict.get("default_flags", {}), - args={k: Argument._from_dict(v) for k, v in command_dict.get("args", {}).items()}, - parse=parse, - default_transformer=snake2kebab, - arg_separator="=", + **command_dict, + **kwargs, ) def _to_dict(self): """ - Convert the Command to a dictionary + Convert the Command to a dictionary. + Excludes prefixes/separators, because they are set in the CLIWrapper :return: the dictionary representation of the Command """ + logger.debug(f"Converting command {self.cli_command} to dict") return { "cli_command": self.cli_command, "default_flags": self.default_flags, "args": {k: v._to_dict() for k, v in self.args.items()}, - "parse": self.parse, - "default_transformer": self.default_transformer, - "arg_separator": self.arg_separator, + "parse": self.parse._to_dict() if self.parse is not None else None, } def validate_args(self, *args, **kwargs): # TODO: validate everything and raise comprehensive exception instead of just the first one for name, arg in chain(enumerate(args), kwargs.items()): + logger.debug(f"Validating arg {name} with value {arg}") if name in self.args: + logger.debug("Argument found in args") v = self.args[name].is_valid(arg) if isinstance(name, int): name += 1 # let's call positional arg 0, "Argument 1" if isinstance(v, str): - raise ValueError(f"Argument {name} is invalid for command {self.cli_command}: {v}") + raise ValueError(f"Value '{arg}' is invalid for command {self.cli_command} arg {name}: {v}") if not v: - raise ValueError(f"Argument {name} is invalid for command {self.cli_command}") + raise ValueError(f"Value '{arg}' is invalid for command {self.cli_command} arg {name}") def build_args(self, *args, **kwargs): positional = [self.cli_command] params = [] for arg, value in chain( - enumerate(args), kwargs.items(), [(k, v) for k, v in self.default_flags.items() if k not in kwargs] + enumerate(args), kwargs.items(), [(k, v) for k, v in self.default_flags.items() if k not in kwargs] ): logger.debug(f"arg: {arg}, value: {value}") if arg in self.args: arg = self.args[arg].literal_name if self.args[arg].literal_name is not None else arg arg, value = self.args[arg].transform(arg, value) else: - arg, value = self.default_transformer(arg, value) + arg, value = transformers.get(self.default_transformer)(arg, value) logger.debug(f"after: arg: {arg}, value: {value}") if isinstance(arg, str): - prefix = "--" if len(arg) > 1 else "-" + prefix = self.long_prefix if len(arg) > 1 else self.short_prefix if value is not None: if self.arg_separator != " ": params.append(f"{prefix}{arg}{self.arg_separator}{value}") @@ -229,7 +238,9 @@ class CLIWrapper: trusting: bool = True async_: bool = False - default_transformer: Callable[[str, str], tuple[str, str]] = snake2kebab + default_transformer: str = "snake2kebab" + short_prefix: str = "-" + long_prefix: str = "--" arg_separator: str = "=" def _get_command(self, command: str): @@ -243,20 +254,22 @@ def _get_command(self, command: str): raise ValueError(f"Command {command} not found in {self.path}") c = Command( cli_command=command, - arg_separator=self.arg_separator, default_transformer=self.default_transformer, + short_prefix=self.short_prefix, + long_prefix=self.long_prefix, + arg_separator=self.arg_separator, ) logger.error(c.parse.__dict__) return c return self.commands[command] def _update_command( - self, - command: str, - cli_command: str = None, - args: dict[str | int, Argument] = None, - default_flags: dict = None, - parse=None, + self, + command: str, + cli_command: str = None, + args: dict[str | int, any] = None, + default_flags: dict = None, + parse=None, ): """ update the command to be run with the cli_wrapper @@ -265,14 +278,14 @@ def _update_command( :param parse: function to parse the output of the command :return: """ - if default_flags is None: - default_flags = {} self.commands[command] = Command( cli_command=command if cli_command is None else cli_command, args=args if args is not None else {}, default_flags=default_flags if default_flags is not None else {}, parse=parse, default_transformer=self.default_transformer, + short_prefix=self.short_prefix, + long_prefix=self.long_prefix, arg_separator=self.arg_separator, ) @@ -293,9 +306,7 @@ def _run(self, command: str, *args, **kwargs): result = subprocess.run(command_args, capture_output=True, text=True, env=env) if result.returncode != 0: raise RuntimeError(f"Command {command} failed with error: {result.stderr}") - if command_obj.parse is not None: - return command_obj.parse(result.stdout) - return result.stdout + return command_obj.parse(result.stdout) async def _run_async(self, command: str, *args, **kwargs): command_obj = self._get_command(command) @@ -313,9 +324,7 @@ async def _run_async(self, command: str, *args, **kwargs): stdout, stderr = await proc.communicate() if proc.returncode != 0: raise RuntimeError(f"Command {command} failed with error: {stderr.decode()}") - if command_obj.parse is not None: - return command_obj.parse(stdout.decode()) - return stdout.decode() + return command_obj.parse(stdout.decode()) def __getattr__(self, item): """ @@ -323,8 +332,6 @@ def __getattr__(self, item): :param item: the command to be run :return: """ - if item not in self.commands and not self.trusting: - raise ValueError(f"Command {item} not found in {self.path}") if self.async_: return lambda *args, **kwargs: self._run_async(item, *args, **kwargs) return lambda *args, **kwargs: self._run(item, *args, **kwargs) @@ -336,8 +343,9 @@ def from_dict(cls, cliwrapper_dict): :param cliwrapper_dict: the dictionary to be converted :return: CLIWrapper object """ + cliwrapper_dict = cliwrapper_dict.copy() commands = {} - for command, config in cliwrapper_dict.get("commands", {}).items(): + for command, config in cliwrapper_dict.pop("commands", {}).items(): if isinstance(config, str): config = {"cli_command": config} else: @@ -346,11 +354,19 @@ def from_dict(cls, cliwrapper_dict): commands[command] = Command._from_dict(config) return CLIWrapper( - path=cliwrapper_dict.get("path"), - env=cliwrapper_dict.get("env", {}), commands=commands, - trusting=cliwrapper_dict.get("trusting", True), - async_=cliwrapper_dict.get("async_", False), - default_transformer=snake2kebab, - arg_separator=cliwrapper_dict.get("arg_separator", "="), + **cliwrapper_dict, ) + + def _to_dict(self): + """ + Convert the CLIWrapper to a dictionary + :return: + """ + return { + "path": self.path, + "env": self.env, + "commands": {k: v._to_dict() for k, v in self.commands.items()}, + "trusting": self.trusting, + "async_": self.async_, + } diff --git a/src/cli_wrapper/parsers.py b/src/cli_wrapper/parsers.py index 1cf6f24..8117344 100644 --- a/src/cli_wrapper/parsers.py +++ b/src/cli_wrapper/parsers.py @@ -1,5 +1,8 @@ import logging +from .util.callable_chain import params_from_kwargs, CallableChain +from .util.callable_registry import CallableRegistry + logger = logging.getLogger(__name__) @@ -17,14 +20,14 @@ def extract(src: dict, *args) -> dict: return src -DEFAULT_PARSERS = { +core_parsers = { "extract": extract, } try: from json import loads - DEFAULT_PARSERS["json"] = loads + core_parsers["json"] = loads except ImportError: # pragma: no cover pass try: @@ -36,15 +39,15 @@ def yaml_loads(src: str) -> dict: # pragma: no cover yaml = YAML(typ="safe") return yaml.load(src) - DEFAULT_PARSERS["yaml"] = yaml_loads + core_parsers["yaml"] = yaml_loads except ImportError: # pragma: no cover pass -if "yaml" not in DEFAULT_PARSERS: +if "yaml" not in core_parsers: try: # pragma: no cover from yaml import safe_load as yaml_loads - DEFAULT_PARSERS["yaml"] = yaml_loads + core_parsers["yaml"] = yaml_loads except ImportError: # pragma: no cover pass @@ -55,75 +58,27 @@ def yaml_loads(src: str) -> dict: # pragma: no cover def dotted_dictify(src, *args, **kwargs): if isinstance(src, list): - return [PreserveKeysDottedDict(x) for x in src] + return [dotted_dictify(x, *args, **kwargs) for x in src] if isinstance(src, dict): return PreserveKeysDottedDict(src) return src - DEFAULT_PARSERS["dotted_dict"] = dotted_dictify + core_parsers["dotted_dict"] = dotted_dictify except ImportError: # pragma: no cover pass +parsers = CallableRegistry({"core": core_parsers}, callable_name="Parser") -def get_parser(parser_name: str, args=None, kwargs=None) -> callable: - """ - Retrieves a parser function based on the specified parser name. - :param parser_name: The name of the parser to retrieve. - :return: The corresponding parser function. - :raises KeyError: If the specified parser name is not found. - """ - if args is None: - args = [] - if kwargs is None: - kwargs = {} - if callable(parser_name): - return lambda x: parser_name(x, *args, **kwargs) - if parser_name not in DEFAULT_PARSERS: - raise KeyError(f"Parser '{parser_name}' not found.") - parser = DEFAULT_PARSERS[parser_name] - return lambda x: parser(x, *args, **kwargs) - - -def params_from_kwargs(src: dict | str) -> tuple[str, list, dict]: - if isinstance(src, str): - return src, [], {} - assert len(src) == 1 - key = list(src.keys())[0] - value = src[key] - if isinstance(value, list): - return key, value, {} - if isinstance(value, dict): - return key, value.pop("args", []), value - - -class Parser: - parsers: list[callable] +class Parser(CallableChain): def __init__(self, config): - self.parsers = [] - self.config = config - if isinstance(config, str): - self.parsers = [get_parser(config)] - if isinstance(config, list): - self.parsers = [] - for x in config: - if callable(x): - self.parsers.append(x) - else: - name, args, kwargs = params_from_kwargs(x) - self.parsers.append(get_parser(name, args, kwargs)) - if isinstance(config, dict): - name, args, kwargs = params_from_kwargs(config) - self.parsers = [get_parser(name, args, kwargs)] + super().__init__(config, parsers) def __call__(self, input): # For now, parser expects to be called with one input. result = input - for parser in self.parsers: + for parser in self.chain: logger.debug(result) result = parser(result) return result - - def _to_dict(self): - return self.config diff --git a/src/cli_wrapper/util.py b/src/cli_wrapper/transformers.py similarity index 58% rename from src/cli_wrapper/util.py rename to src/cli_wrapper/transformers.py index 6918f58..a8c6752 100644 --- a/src/cli_wrapper/util.py +++ b/src/cli_wrapper/transformers.py @@ -1,6 +1,4 @@ -""" -Utility functions for the CLI wrapper. -""" +from .util.callable_registry import CallableRegistry def snake2kebab(arg: str, value: any) -> tuple[str, any]: @@ -11,3 +9,10 @@ def snake2kebab(arg: str, value: any) -> tuple[str, any]: return arg.replace("_", "-"), value # don't do anything if the arg is positional return arg, value + + +core_transformers = { + "snake2kebab": snake2kebab, +} + +transformers = CallableRegistry({"core": core_transformers}) diff --git a/src/cli_wrapper/util/__init__.py b/src/cli_wrapper/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cli_wrapper/util/callable_chain.py b/src/cli_wrapper/util/callable_chain.py new file mode 100644 index 0000000..b099967 --- /dev/null +++ b/src/cli_wrapper/util/callable_chain.py @@ -0,0 +1,51 @@ +from abc import ABC, abstractmethod + + +class CallableChain(ABC): + chain: list[callable] + config: list + + def __init__(self, config, source): + self.chain = [] + self.config = config + if callable(config): + self.chain = [config] + if isinstance(config, str): + self.chain = [source.get(config)] + if isinstance(config, list): + self.chain = [] + for x in config: + if callable(x): + self.chain.append(x) + else: + name, args, kwargs = params_from_kwargs(x) + self.chain.append(source.get(name, args, kwargs)) + if isinstance(config, dict): + name, args, kwargs = params_from_kwargs(config) + self.chain = [source.get(name, args, kwargs)] + + def _to_dict(self): + return self.config + + @abstractmethod + def __call__(self, value): + """ + Calls the chain of functions with the given value. + """ + raise NotImplementedError() + + +def params_from_kwargs(src: dict | str) -> tuple[str, list, dict]: + if isinstance(src, str): + return src, [], {} + assert len(src) == 1 + key = list(src.keys())[0] + value = src[key] + if isinstance(value, list): + return key, value, {} + if isinstance(value, dict): + args = value.pop("args", []) + if isinstance(args, str): + args = [args] + return key, args, value + return key, [value], {} diff --git a/src/cli_wrapper/util/callable_registry.py b/src/cli_wrapper/util/callable_registry.py new file mode 100644 index 0000000..fe84691 --- /dev/null +++ b/src/cli_wrapper/util/callable_registry.py @@ -0,0 +1,92 @@ +from typing import Callable + +from attrs import define + + +@define +class CallableRegistry: + _all: dict[str, dict[str, Callable]] + callable_name: str = "Callable thing" + + def get(self, name: str | Callable, args=None, kwargs=None) -> Callable: + """ + Retrieves a parser function based on the specified parser name. + + :param name: The name of the parser to retrieve. + :return: The corresponding parser function. + :raises KeyError: If the specified parser name is not found. + """ + if args is None: + args = [] + if kwargs is None: + kwargs = {} + if callable(name): + return lambda *fargs: name(*fargs, *args, **kwargs) + callable_ = None + group, name = self._parse_name(name) + if group is not None: + if group not in self._all: + raise KeyError(f"{self.callable_name} group '{group}' not found.") + parser_group = self._all[group] + if name not in parser_group: + raise KeyError(f"{self.callable_name} '{name}' not found.") + callable_ = parser_group[name] + else: + for _, v in self._all.items(): + if name in v: + callable_ = v[name] + break + if callable_ is None: + raise KeyError(f"{self.callable_name} '{name}' not found.") + return lambda *fargs: callable_(*fargs, *args, **kwargs) + + def register(self, name: str, callable: callable, group="core"): + """ + Registers a new parser function with the specified name. + + :param name: The name to associate with the parser. + :param callable: The callable function to register. + """ + ngroup, name = self._parse_name(name) + if ngroup is not None: + if group != "core": + # approximately, raise an exception if a group is specified in the name and the group arg + raise KeyError(f"'{callable}' already specifies a group.") + group = ngroup + if name in self._all[group]: + raise KeyError(f"{self.callable_name} '{name}' already registered.") + self._all[group][name] = callable + + def register_group(self, name: str, parsers: dict = None): + """ + Registers a new parser group with the specified name. + + :param name: The name to associate with the parser group. + :param parsers: A dictionary of parsers to register in the group. + """ + if name in self._all: + raise KeyError(f"{self.callable_name} group '{name}' already registered.") + if "." in name: + raise KeyError(f"{self.callable_name} group name '{name}' is not valid.") + parsers = {} if parsers is None else parsers + bad_parser_names = [x for x in parsers.keys() if "." in x] + if bad_parser_names: + raise KeyError( + f"{self.callable_name} group '{name}' contains invalid parser names: {', '.join(bad_parser_names)}" + ) + self._all[name] = parsers + + def _parse_name(self, name: str) -> tuple[str, str]: + """ + Parses a name into a group and parser name. + + :param name: The name to parse. + :return: A tuple containing the group and parser name. + """ + if "." not in name: + return None, name + try: + group, name = name.split(".") + except ValueError: + raise KeyError(f"{self.callable_name} name '{name}' is not valid.") + return group, name diff --git a/src/cli_wrapper/validators.py b/src/cli_wrapper/validators.py new file mode 100644 index 0000000..2e31836 --- /dev/null +++ b/src/cli_wrapper/validators.py @@ -0,0 +1,54 @@ +import logging +from uuid import uuid4 + +from .util.callable_chain import CallableChain +from .util.callable_registry import CallableRegistry + +logger = logging.getLogger(__name__) + +core_validators = { + "is_dict": lambda x: isinstance(x, dict), + "is_list": lambda x: isinstance(x, list), + "is_str": lambda x: isinstance(x, str), + "is_int": lambda x: isinstance(x, int), + "is_bool": lambda x: isinstance(x, bool), + "is_float": lambda x: isinstance(x, float), + "is_alnum": lambda x: isinstance(x, str) and x.isalnum(), + "is_alpha": lambda x: isinstance(x, str) and x.isalpha(), + "is_digit": lambda x: isinstance(x, str) and x.isdigit(), + "starts_alpha": lambda x: isinstance(x, str) and x[0].isalpha(), + "startswith": lambda x, prefix: isinstance(x, str) and x.startswith(prefix), +} + +validators = CallableRegistry({"core": core_validators}, callable_name="Validator") + + +class Validator(CallableChain): + """ + A class that provides a validation mechanism for input data. + It uses a list of validators to check if the input data is valid. + """ + + def __init__(self, config): + if callable(config): + id = str(uuid4()) + validators.register(id, config) + config = id + self.config = config + super().__init__(config, validators) + + def __call__(self, value): + result = True + config = [self.config] if not isinstance(self.config, list) else self.config + for x, c in zip(self.chain, config): + validator_result = x(value) + logger.debug(f"Validator {c} result: {validator_result}") + result = result and validator_result + return result + + def _to_dict(self): + """ + Converts the validator configuration to a dictionary. + """ + logger.debug(f"returning validator config: {self.config}") + return self.config diff --git a/tests/test_cli_wrapper.py b/tests/test_cli_wrapper.py index a8cec22..9f49f17 100644 --- a/tests/test_cli_wrapper.py +++ b/tests/test_cli_wrapper.py @@ -5,7 +5,8 @@ import pytest from cli_wrapper.cli_wrapper import CLIWrapper, Argument, Command -from cli_wrapper.util import snake2kebab +from cli_wrapper.transformers import transformers +from cli_wrapper.validators import validators logger = logging.getLogger(__name__) @@ -18,11 +19,11 @@ def test_argument(self): assert arg.is_valid("invalid") is False assert arg.is_valid(None) is False - with pytest.raises(ValueError): + with pytest.raises(KeyError): Argument("test", validator="not callable") def test_argument_from_dict(self): - arg = Argument._from_dict({"literal_name": "test", "default": "default", "validator": lambda x: x == "valid"}) + arg = Argument.from_dict({"literal_name": "test", "default": "default", "validator": lambda x: x == "valid"}) assert arg.literal_name == "test" assert arg.default == "default" @@ -30,31 +31,48 @@ def test_argument_from_dict(self): assert arg.is_valid("invalid") is False assert arg.is_valid(None) is False - arg = Argument._from_dict({}) + arg = Argument.from_dict({}) assert arg.literal_name is None assert arg.default is None assert arg.is_valid("valid") is True - with pytest.raises(ValueError): - Argument._from_dict({"name": "test", "validator": "not callable"}) + with pytest.raises(KeyError): + Argument.from_dict({"name": "test", "validator": "nonexistent_validator"}) class TestCommand: def test_command(self): - def parse_output(output): - return output + def validator(name): + return "You can only get pods because I'm a jerk" if name not in ["pod", "pods"] else True + + validators.register_group( + "test_command_group", + { + "is_pods": validator, + "is_kube_system": lambda x: x == "kube-system", + }, + ) command = Command( cli_command="get", default_flags={"namespace": "default"}, + default_transformer="snake2kebab", args={ - 1: Argument(validator=lambda x: x == "pod-1"), + 0: {"validator": "test_command_group.is_pods"}, + "namespace": {"validator": "is_kube_system"}, }, ) command.validate_args("pod", "pod-1", namespace="kube-system") - with pytest.raises(ValueError): - command.validate_args("pod", "pod-2", namespace="kube-system") + with pytest.raises(ValueError) as err: + command.validate_args("pod", "pod-2", namespace="tube-system") + assert str(err.value) == "Value 'tube-system' is invalid for command get arg namespace" + with pytest.raises(ValueError) as err: + command.validate_args("deployments", "pod-1", namespace="kube-system") + assert ( + str(err.value) + == "Value 'deployments' is invalid for command get arg 1: You can only get pods because I'm a jerk" + ) args = command.build_args("pod", "pod-1", namespace="kube-system") logger.error(args) @@ -71,12 +89,14 @@ def parse_output(output): cli_command="create", default_flags={"namespace": "default"}, args={ - 0: Argument(transformer=lambda x, y: ("filename", "filename")), + 0: {"transformer": lambda x, y: ("filename", "filename")}, }, ) args = command.build_args({"some": "dict"}) assert args == ["create", "--filename=filename", "--namespace=default"] + validators._all.pop("test_command_group") + def test_command_from_dict(self): command = Command._from_dict( { @@ -90,7 +110,6 @@ def test_command_from_dict(self): assert command.default_flags == {"namespace": "default"} assert command.args[1].is_valid("pod-1") is True assert command.args[1].is_valid("pod-2") is False - assert command.default_transformer == snake2kebab with pytest.raises(ValueError): command.validate_args("pod", "pod-2", namespace="kube-system") @@ -107,8 +126,19 @@ def test_command_from_dict(self): class TestCLIWrapper: def test_cliwrapper(self): logger.info("Testing CLIWrapper, trusting with get json/loads") - kubectl = CLIWrapper("kubectl", trusting=True) - kubectl._update_command("get", default_flags={"output": "json"}, parse=loads) + kubectl = CLIWrapper("kubectl", trusting=False) + + with pytest.raises(ValueError): + kubectl.get("pods", namespace="kube-system") + + kubectl.trusting = True + + kubectl._update_command("get") + r = kubectl.get("pods", namespace="kube-system") + + assert isinstance(r, str) + kubectl.commands["get"].default_flags = {"output": "json"} + kubectl.commands["get"].parse = ["json"] r = kubectl.get("pods", "-A") assert r["kind"] == "List" diff --git a/tests/test_parsers.py b/tests/test_parsers.py index e2649fa..3793865 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -1,4 +1,6 @@ -from cli_wrapper.parsers import Parser +import pytest + +from cli_wrapper.parsers import Parser, parsers class TestParsers: @@ -16,13 +18,11 @@ def test_parser(self): # list of parsers with custom function def custom_extract(src): return src["foo"]["bar"] + parser = Parser(["json", custom_extract]) assert parser(testdata) == "baz" - testdata = ( - "foo:\n" - " bar: baz\n" - ) + testdata = "foo:\n" " bar: baz\n" parser = Parser(["yaml", "dotted_dict"]) assert parser(testdata).foo.bar == "baz" @@ -34,5 +34,75 @@ def custom_extract(src): # custom function with params def extract_with_params(src, *args, **kwargs): return src[args[0]][kwargs["key"]] + parser = Parser({extract_with_params: {"args": ["foo"], "key": "bar"}}) - assert parser(testdata) == "baz" \ No newline at end of file + assert parser(testdata) == "baz" + + with pytest.raises(KeyError): + parser = Parser("non_existing_parser") + parser(testdata) + + def test_weird_cases(self): + testdata = '"1"' + parser = Parser(["json", "dotted_dict"]) + assert parser(testdata) == "1" + + testdata = "[1, 2, 3]" + assert parser(testdata) == [1, 2, 3] + testdata = '[1, {"foo": "bar"}]' + p = parser(testdata) + assert p[0] == 1 + assert p[1].foo == "bar" + + def test_parsers_register(self): + def custom_parser(src): + return src["foo"]["bar"] + + parsers.register("custom_parser", custom_parser) + parser = Parser(["json", "custom_parser"]) + testdata = '{"foo": {"bar": "baz"}}' + assert parser(testdata) == "baz" + + # Test for duplicate parser registration + with pytest.raises(KeyError): + parsers.register("custom_parser", custom_parser) + + def test_parsers_register_group(self): + def custom_parser(src): + return src["foo"]["bar"] + + parsers.register_group("custom_group", {"custom_parser": custom_parser}) + parser = Parser(["json", "custom_group.custom_parser"]) + testdata = '{"foo": {"bar": "baz"}}' + assert parser(testdata) == "baz" + + # Test for duplicate parser group registration + with pytest.raises(KeyError): + parsers.register_group("custom_group", {"custom_parser": custom_parser}) + + def test_parser_registration_errors(self): + def custom_parser(src): + return src["foo"]["bar"] + + with pytest.raises(KeyError): + parsers.register("custom.parser", custom_parser) + + with pytest.raises(KeyError): + parsers.register_group("custom.group", {"custom_parser": custom_parser}) + + with pytest.raises(KeyError): + parsers.register_group("custom_group", {"custom.parser": custom_parser}) + + def test_get_parser_errors(self): + with pytest.raises(KeyError): + parsers.get("non_existing_parser") + with pytest.raises(KeyError): + parsers.get("non_existing_group.non_existing_parser") + try: + parsers.register_group("custom_group", {"custom_parser": lambda x: x}) + except KeyError: + pass + with pytest.raises(KeyError): + parsers.get("custom_group.non_existing_parser") + with pytest.raises(KeyError): + parsers.get("too.many.dots.in.name") diff --git a/tests/test_serialization.py b/tests/test_serialization.py new file mode 100644 index 0000000..16a351a --- /dev/null +++ b/tests/test_serialization.py @@ -0,0 +1,34 @@ +from cli_wrapper import CLIWrapper +from cli_wrapper.cli_wrapper import Argument + + +class TestSerialization: + def test_wrapper_to_dict(self): + kubectl = CLIWrapper("kubectl", trusting=True) + kubectl._update_command( + "get", + default_flags={"output": "json"}, + parse="json", + args={ + "namespace": {"validator": ["is_alnum", "starts_alpha"]}, + }, + ) + + config = kubectl._to_dict() + + kubectl2 = CLIWrapper.from_dict(config) + + assert kubectl2._to_dict() == config + + def test_argument_to_dict(self): + arg = Argument( + literal_name="namespace", + default="default", + validator=["is_alnum", "starts_alpha"], + ) + + config = arg._to_dict() + + arg2 = Argument.from_dict(config) + + assert arg2._to_dict() == config diff --git a/tests/test_validators.py b/tests/test_validators.py new file mode 100644 index 0000000..82ffff5 --- /dev/null +++ b/tests/test_validators.py @@ -0,0 +1,30 @@ +from cli_wrapper.validators import Validator, validators + + +class TestValidators: + def test_validator(self): + v = Validator("is_alnum") + assert v("abc123") is True + assert v("abc123!") is False + + validators.register_group("test_validators_group") + + validators.register("is_pod1", lambda x: x == "pod1", group="test_validators_group") + v = Validator("test_validators_group.is_pod1") + assert v("pod1") is True + assert v("pod2") is False + + validators.register("is_equal", lambda x, y: x == y, group="test_validators_group") + v = Validator({"test_validators_group.is_equal": "pod1"}) + assert v("pod1") is True + assert v("pod2") is False + + def is_equal(x, y, case_sensitive=True): + return x == y if case_sensitive else x.lower() == y.lower() + + validators._all["test_validators_group"]["is_equal"] = is_equal + v = Validator({"test_validators_group.is_equal": {"args": "pod1", "case_sensitive": False}}) + assert v("pod1") is True + assert v("POD1") is True + + validators._all.pop("test_validators_group") From 2e064b6e26e0b4315b496db3d237a76a3f188c62 Mon Sep 17 00:00:00 2001 From: Reid Orsten Date: Fri, 18 Apr 2025 09:56:44 -0600 Subject: [PATCH 2/4] pr checks --- .github/workflows/checks.yaml | 29 ++++++++++++++++ pyproject.toml | 11 +++++- src/cli_wrapper/cli_wrapper.py | 37 ++++++++++---------- src/cli_wrapper/parsers.py | 9 +++-- src/cli_wrapper/util/callable_chain.py | 2 +- src/cli_wrapper/util/callable_registry.py | 12 +++---- src/cli_wrapper/validators.py | 8 ++--- tests/test_cli_wrapper.py | 6 ++-- tests/test_parsers.py | 2 +- tests/test_serialization.py | 8 ++--- tests/test_validators.py | 41 +++++++++++------------ 11 files changed, 103 insertions(+), 62 deletions(-) create mode 100644 .github/workflows/checks.yaml diff --git a/.github/workflows/checks.yaml b/.github/workflows/checks.yaml new file mode 100644 index 0000000..5695cbd --- /dev/null +++ b/.github/workflows/checks.yaml @@ -0,0 +1,29 @@ +--- +name: static-analysis +on: push +jobs: + static-analysis: + name: Static Analysis + runs-on: ubuntu-latest + container: python:3.12 + steps: + - uses: actions/checkout@v4 + - name: install prerequisites + run: | + pip install .[check] .[test] black pylint + curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 --output cc-test-reporter + chmod +x cc-test-reporter + - name: run black + run: | + black --check --diff . + - name: run pylint + run: | + pylint --fail-under 10 src + pylint --fail-under 10 --disable=protected-access tests + - name: run tests + env: + CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} + run: | + ./cc-test-reporter before-build + pytest . --cov-report lcov + ./cc-test-reporter after-build --exit-code $? diff --git a/pyproject.toml b/pyproject.toml index ecb4394..076fa0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,4 +40,13 @@ log_cli = "true" testpaths = ["tests"] pythonpath = "src" addopts = "-n 4 -Walways --cov=cli_wrapper --cov-branch --cov-report html:coverage" -asyncio_default_fixture_loop_scope = "function" \ No newline at end of file +asyncio_default_fixture_loop_scope = "function" + +[tool.pylint."messages control"] +disable = [ + "missing-module-docstring", + "missing-class-docstring", + "missing-function-docstring", + "logging-fstring-interpolation", + "fixme" +] diff --git a/src/cli_wrapper/cli_wrapper.py b/src/cli_wrapper/cli_wrapper.py index 4f8f9a9..365846d 100644 --- a/src/cli_wrapper/cli_wrapper.py +++ b/src/cli_wrapper/cli_wrapper.py @@ -46,7 +46,7 @@ def validate_pod_name(name): to `command --arg=val`. If you want to use spaces instead, set this to ' ' """ -import asyncio +import asyncio.subprocess import logging import os import subprocess @@ -87,7 +87,7 @@ def from_dict(cls, arg_dict): transformer=arg_dict.get("transformer", None), ) - def _to_dict(self): + def to_dict(self): """ Convert the Argument to a dictionary :return: the dictionary representation of the Argument @@ -96,7 +96,7 @@ def _to_dict(self): return { "literal_name": self.literal_name, "default": self.default, - "validator": self.validator._to_dict() if self.validator is not None else None, + "validator": self.validator.to_dict() if self.validator is not None else None, } def is_valid(self, value): @@ -139,7 +139,7 @@ def arg_converter(value: dict): @define -class Command(object): +class Command: # pylint: disable=too-many-instance-attributes """ Command represents a command to be run with the cli_wrapper """ @@ -154,7 +154,7 @@ class Command(object): arg_separator: str = field(repr=False, default="=") @classmethod - def _from_dict(cls, command_dict, **kwargs): + def from_dict(cls, command_dict, **kwargs): """ Create a Command from a dictionary :param command_dict: the dictionary to be converted @@ -172,7 +172,7 @@ def _from_dict(cls, command_dict, **kwargs): **kwargs, ) - def _to_dict(self): + def to_dict(self): """ Convert the Command to a dictionary. Excludes prefixes/separators, because they are set in the CLIWrapper @@ -182,8 +182,8 @@ def _to_dict(self): return { "cli_command": self.cli_command, "default_flags": self.default_flags, - "args": {k: v._to_dict() for k, v in self.args.items()}, - "parse": self.parse._to_dict() if self.parse is not None else None, + "args": {k: v.to_dict() for k, v in self.args.items()}, + "parse": self.parse.to_dict() if self.parse is not None else None, } def validate_args(self, *args, **kwargs): @@ -231,12 +231,13 @@ def build_args(self, *args, **kwargs): @define -class CLIWrapper: +class CLIWrapper: # pylint: disable=too-many-instance-attributes path: str env: dict[str, str] = None commands: dict[str, Command] = {} trusting: bool = True + raise_exc: bool = False async_: bool = False default_transformer: str = "snake2kebab" short_prefix: str = "-" @@ -263,17 +264,19 @@ def _get_command(self, command: str): return c return self.commands[command] - def _update_command( + def _update_command( # pylint: disable=too-many-arguments self, command: str, - cli_command: str = None, + *, + cli_command: str | list[str] = None, args: dict[str | int, any] = None, default_flags: dict = None, parse=None, ): """ update the command to be run with the cli_wrapper - :param command: the subcommand for the cli tool + :param command: the command name for the wrapper + :param cli_command: the command to be run, if different from the command name :param default_flags: default flags to be used with the command :param parse: function to parse the output of the command :return: @@ -303,7 +306,7 @@ def _run(self, command: str, *args, **kwargs): env = os.environ.copy().update(self.env if self.env is not None else {}) logger.debug(f"Running command: {' '.join(command_args)}") # run the command - result = subprocess.run(command_args, capture_output=True, text=True, env=env) + result = subprocess.run(command_args, capture_output=True, text=True, env=env, check=self.raise_exc) if result.returncode != 0: raise RuntimeError(f"Command {command} failed with error: {result.stderr}") return command_obj.parse(result.stdout) @@ -314,7 +317,7 @@ async def _run_async(self, command: str, *args, **kwargs): command_args = [self.path] + list(command_obj.build_args(*args, **kwargs)) env = os.environ.copy().update(self.env if self.env is not None else {}) logger.error(f"Running command: {', '.join(command_args)}") - proc = await asyncio.subprocess.create_subprocess_exec( + proc = await asyncio.subprocess.create_subprocess_exec( # pylint: disable=no-member *command_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, @@ -351,14 +354,14 @@ def from_dict(cls, cliwrapper_dict): else: if "cli_command" not in config: config["cli_command"] = command - commands[command] = Command._from_dict(config) + commands[command] = Command.from_dict(config) return CLIWrapper( commands=commands, **cliwrapper_dict, ) - def _to_dict(self): + def to_dict(self): """ Convert the CLIWrapper to a dictionary :return: @@ -366,7 +369,7 @@ def _to_dict(self): return { "path": self.path, "env": self.env, - "commands": {k: v._to_dict() for k, v in self.commands.items()}, + "commands": {k: v.to_dict() for k, v in self.commands.items()}, "trusting": self.trusting, "async_": self.async_, } diff --git a/src/cli_wrapper/parsers.py b/src/cli_wrapper/parsers.py index 8117344..90c55cc 100644 --- a/src/cli_wrapper/parsers.py +++ b/src/cli_wrapper/parsers.py @@ -1,6 +1,6 @@ import logging -from .util.callable_chain import params_from_kwargs, CallableChain +from .util.callable_chain import CallableChain from .util.callable_registry import CallableRegistry logger = logging.getLogger(__name__) @@ -71,13 +71,16 @@ def dotted_dictify(src, *args, **kwargs): class Parser(CallableChain): + """ + Parser class that allows for the chaining of multiple parsers. + """ def __init__(self, config): super().__init__(config, parsers) - def __call__(self, input): + def __call__(self, src): # For now, parser expects to be called with one input. - result = input + result = src for parser in self.chain: logger.debug(result) result = parser(result) diff --git a/src/cli_wrapper/util/callable_chain.py b/src/cli_wrapper/util/callable_chain.py index b099967..57df3a2 100644 --- a/src/cli_wrapper/util/callable_chain.py +++ b/src/cli_wrapper/util/callable_chain.py @@ -24,7 +24,7 @@ def __init__(self, config, source): name, args, kwargs = params_from_kwargs(config) self.chain = [source.get(name, args, kwargs)] - def _to_dict(self): + def to_dict(self): return self.config @abstractmethod diff --git a/src/cli_wrapper/util/callable_registry.py b/src/cli_wrapper/util/callable_registry.py index fe84691..96612ac 100644 --- a/src/cli_wrapper/util/callable_registry.py +++ b/src/cli_wrapper/util/callable_registry.py @@ -40,22 +40,22 @@ def get(self, name: str | Callable, args=None, kwargs=None) -> Callable: raise KeyError(f"{self.callable_name} '{name}' not found.") return lambda *fargs: callable_(*fargs, *args, **kwargs) - def register(self, name: str, callable: callable, group="core"): + def register(self, name: str, callable_: callable, group="core"): """ Registers a new parser function with the specified name. :param name: The name to associate with the parser. - :param callable: The callable function to register. + :param callable_: The callable function to register. """ ngroup, name = self._parse_name(name) if ngroup is not None: if group != "core": # approximately, raise an exception if a group is specified in the name and the group arg - raise KeyError(f"'{callable}' already specifies a group.") + raise KeyError(f"'{callable_}' already specifies a group.") group = ngroup if name in self._all[group]: raise KeyError(f"{self.callable_name} '{name}' already registered.") - self._all[group][name] = callable + self._all[group][name] = callable_ def register_group(self, name: str, parsers: dict = None): """ @@ -87,6 +87,6 @@ def _parse_name(self, name: str) -> tuple[str, str]: return None, name try: group, name = name.split(".") - except ValueError: - raise KeyError(f"{self.callable_name} name '{name}' is not valid.") + except ValueError as err: + raise KeyError(f"{self.callable_name} name '{name}' is not valid.") from err return group, name diff --git a/src/cli_wrapper/validators.py b/src/cli_wrapper/validators.py index 2e31836..79a7de1 100644 --- a/src/cli_wrapper/validators.py +++ b/src/cli_wrapper/validators.py @@ -31,9 +31,9 @@ class Validator(CallableChain): def __init__(self, config): if callable(config): - id = str(uuid4()) - validators.register(id, config) - config = id + id_ = str(uuid4()) + validators.register(id_, config) + config = id_ self.config = config super().__init__(config, validators) @@ -46,7 +46,7 @@ def __call__(self, value): result = result and validator_result return result - def _to_dict(self): + def to_dict(self): """ Converts the validator configuration to a dictionary. """ diff --git a/tests/test_cli_wrapper.py b/tests/test_cli_wrapper.py index 9f49f17..c132494 100644 --- a/tests/test_cli_wrapper.py +++ b/tests/test_cli_wrapper.py @@ -1,11 +1,9 @@ -import json import logging from json import loads import pytest from cli_wrapper.cli_wrapper import CLIWrapper, Argument, Command -from cli_wrapper.transformers import transformers from cli_wrapper.validators import validators logger = logging.getLogger(__name__) @@ -98,7 +96,7 @@ def validator(name): validators._all.pop("test_command_group") def test_command_from_dict(self): - command = Command._from_dict( + command = Command.from_dict( { "cli_command": "get", "default_flags": {"namespace": "default"}, @@ -120,7 +118,7 @@ def test_command_from_dict(self): "--namespace=kube-system", ] - command = Command._from_dict({"cli_command": "get", "args": {}}) + command = Command.from_dict({"cli_command": "get", "args": {}}) class TestCLIWrapper: diff --git a/tests/test_parsers.py b/tests/test_parsers.py index 3793865..e9b7f3e 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -22,7 +22,7 @@ def custom_extract(src): parser = Parser(["json", custom_extract]) assert parser(testdata) == "baz" - testdata = "foo:\n" " bar: baz\n" + testdata = "foo:\n bar: baz\n" parser = Parser(["yaml", "dotted_dict"]) assert parser(testdata).foo.bar == "baz" diff --git a/tests/test_serialization.py b/tests/test_serialization.py index 16a351a..7956216 100644 --- a/tests/test_serialization.py +++ b/tests/test_serialization.py @@ -14,11 +14,11 @@ def test_wrapper_to_dict(self): }, ) - config = kubectl._to_dict() + config = kubectl.to_dict() kubectl2 = CLIWrapper.from_dict(config) - assert kubectl2._to_dict() == config + assert kubectl2.to_dict() == config def test_argument_to_dict(self): arg = Argument( @@ -27,8 +27,8 @@ def test_argument_to_dict(self): validator=["is_alnum", "starts_alpha"], ) - config = arg._to_dict() + config = arg.to_dict() arg2 = Argument.from_dict(config) - assert arg2._to_dict() == config + assert arg2.to_dict() == config diff --git a/tests/test_validators.py b/tests/test_validators.py index 82ffff5..04be823 100644 --- a/tests/test_validators.py +++ b/tests/test_validators.py @@ -1,30 +1,29 @@ from cli_wrapper.validators import Validator, validators -class TestValidators: - def test_validator(self): - v = Validator("is_alnum") - assert v("abc123") is True - assert v("abc123!") is False +def test_validator(): + v = Validator("is_alnum") + assert v("abc123") is True + assert v("abc123!") is False - validators.register_group("test_validators_group") + validators.register_group("test_validators_group") - validators.register("is_pod1", lambda x: x == "pod1", group="test_validators_group") - v = Validator("test_validators_group.is_pod1") - assert v("pod1") is True - assert v("pod2") is False + validators.register("is_pod1", lambda x: x == "pod1", group="test_validators_group") + v = Validator("test_validators_group.is_pod1") + assert v("pod1") is True + assert v("pod2") is False - validators.register("is_equal", lambda x, y: x == y, group="test_validators_group") - v = Validator({"test_validators_group.is_equal": "pod1"}) - assert v("pod1") is True - assert v("pod2") is False + validators.register("is_equal", lambda x, y: x == y, group="test_validators_group") + v = Validator({"test_validators_group.is_equal": "pod1"}) + assert v("pod1") is True + assert v("pod2") is False - def is_equal(x, y, case_sensitive=True): - return x == y if case_sensitive else x.lower() == y.lower() + def is_equal(x, y, case_sensitive=True): + return x == y if case_sensitive else x.lower() == y.lower() - validators._all["test_validators_group"]["is_equal"] = is_equal - v = Validator({"test_validators_group.is_equal": {"args": "pod1", "case_sensitive": False}}) - assert v("pod1") is True - assert v("POD1") is True + validators._all["test_validators_group"]["is_equal"] = is_equal + v = Validator({"test_validators_group.is_equal": {"args": "pod1", "case_sensitive": False}}) + assert v("pod1") is True + assert v("POD1") is True - validators._all.pop("test_validators_group") + validators._all.pop("test_validators_group") From c0b9b955f2122bb4cc3b88b35d2a4904fe80d493 Mon Sep 17 00:00:00 2001 From: Reid Orsten Date: Fri, 18 Apr 2025 10:26:24 -0600 Subject: [PATCH 3/4] fake kubectl for tests --- tests/data/fake_kubectl | 53 +++++++++++++++++++++++++++++++++++++++ tests/test_cli_wrapper.py | 9 ++++--- 2 files changed, 59 insertions(+), 3 deletions(-) create mode 100755 tests/data/fake_kubectl diff --git a/tests/data/fake_kubectl b/tests/data/fake_kubectl new file mode 100755 index 0000000..22b526a --- /dev/null +++ b/tests/data/fake_kubectl @@ -0,0 +1,53 @@ +#!/bin/bash + +if [ "$1" == "get" ]; then + if [ "$2" == "pods" ]; then + echo " + { + \"apiVersion\": \"v1\", + \"kind\": \"List\", + \"items\": [ + { + \"metadata\": { + \"name\": \"example-pod\", + \"namespace\": \"default\" + }, + \"spec\": { + \"containers\": [ + { + \"name\": \"example-container\", + \"image\": \"example-image:latest\" + } + ] + } + } + ] + }" + fi + if [ "$2" == "pod" ]; then + echo " + { + \"apiVersion\": \"v1\", + \"kind\": \"Pod\", + \"metadata\": { + \"name\": \"$3\", + \"namespace\": \"default\" + }, + \"spec\": { + \"containers\": [ + { + \"name\": \"example-container\", + \"image\": \"example-image:latest\" + } + ] + } + }" + fi +fi +if [ $1 == "describe" ]; then + echo "some output or other" +fi +if [ $1 == "fake" ]; then + echo "no! this is wrong!" + exit 1 +fi diff --git a/tests/test_cli_wrapper.py b/tests/test_cli_wrapper.py index c132494..b59ebc6 100644 --- a/tests/test_cli_wrapper.py +++ b/tests/test_cli_wrapper.py @@ -1,5 +1,6 @@ import logging from json import loads +from pathlib import Path import pytest @@ -123,8 +124,9 @@ def test_command_from_dict(self): class TestCLIWrapper: def test_cliwrapper(self): - logger.info("Testing CLIWrapper, trusting with get json/loads") - kubectl = CLIWrapper("kubectl", trusting=False) + # fake kubectl script for github actions + fake_kubectl = Path(__file__).parent / "data/fake_kubectl" + kubectl = CLIWrapper(fake_kubectl.as_posix(), trusting=False) with pytest.raises(ValueError): kubectl.get("pods", namespace="kube-system") @@ -161,7 +163,8 @@ def test_cliwrapper(self): @pytest.mark.asyncio async def test_subprocessor_async(self): - kubectl = CLIWrapper("kubectl", trusting=True, async_=True) + fake_kubectl = Path(__file__).parent / "data/fake_kubectl" + kubectl = CLIWrapper(fake_kubectl.as_posix(), trusting=True, async_=True) kubectl._update_command("get", default_flags={"output": "json"}, parse=loads) r = await kubectl.get("pods", namespace="kube-system") assert r["kind"] == "List" From ce215dd97de75548fd3fb3ea3e8212f8f7ac4602 Mon Sep 17 00:00:00 2001 From: Reid Orsten Date: Fri, 18 Apr 2025 10:42:33 -0600 Subject: [PATCH 4/4] use qlty I guess --- .github/workflows/checks.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/checks.yaml b/.github/workflows/checks.yaml index 5695cbd..7b8601a 100644 --- a/.github/workflows/checks.yaml +++ b/.github/workflows/checks.yaml @@ -8,11 +8,11 @@ jobs: container: python:3.12 steps: - uses: actions/checkout@v4 + with: + fetch-depth: 2 - name: install prerequisites run: | pip install .[check] .[test] black pylint - curl -L https://codeclimate.com/downloads/test-reporter/test-reporter-latest-linux-amd64 --output cc-test-reporter - chmod +x cc-test-reporter - name: run black run: | black --check --diff . @@ -21,9 +21,9 @@ jobs: pylint --fail-under 10 src pylint --fail-under 10 --disable=protected-access tests - name: run tests - env: - CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} run: | - ./cc-test-reporter before-build pytest . --cov-report lcov - ./cc-test-reporter after-build --exit-code $? + - uses: qltysh/qlty-action/coverage@main + with: + token: ${{secrets.QLTY_COVERAGE_TOKEN}} + files: coverage.lcov