diff --git a/.gitignore b/.gitignore index a3e86f32..db03d7e3 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +.idea/ # C extensions *.so diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..238042c1 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +l lint: + @echo "Executing lint in backend code (pre-commit)" + pre-commit run --show-diff-on-failure --color=always --all-files diff --git a/openhexa/sdk/pipelines/parameter/__init__.py b/openhexa/sdk/pipelines/parameter/__init__.py new file mode 100644 index 00000000..774fd1ec --- /dev/null +++ b/openhexa/sdk/pipelines/parameter/__init__.py @@ -0,0 +1,68 @@ +"""Pipeline parameters classes and functions. + +See https://github.com/BLSQ/openhexa/wiki/Writing-OpenHEXA-pipelines#pipeline-parameters for more information. +""" + +from openhexa.sdk.pipelines.exceptions import InvalidParameterError, ParameterValueError + +from .choices import ChoicesFromFile +from .decorator import FunctionWithParameter, Parameter, parameter, validate_parameters +from .types import ( + TYPES_BY_PYTHON_TYPE, + Boolean, + ConnectionParameterType, + CustomConnectionType, + DatasetType, + DHIS2ConnectionType, + FileType, + Float, + GCSConnectionType, + IASOConnectionType, + Integer, + ParameterType, + PostgreSQLConnectionType, + S3ConnectionType, + Secret, + SecretType, + StringType, +) +from .widgets import DHIS2Widget, IASOWidget + +__all__ = [ + # Decorator and core classes + "parameter", + "Parameter", + "FunctionWithParameter", + "validate_parameters", + # Type base classes + "ParameterType", + "ConnectionParameterType", + # Primitive types + "StringType", + "Boolean", + "Integer", + "Float", + # Connection types + "PostgreSQLConnectionType", + "S3ConnectionType", + "GCSConnectionType", + "DHIS2ConnectionType", + "IASOConnectionType", + "CustomConnectionType", + # Resource types + "DatasetType", + "FileType", + # Secret + "Secret", + "SecretType", + # Registry + "TYPES_BY_PYTHON_TYPE", + # Dynamic choices + "ChoicesFromFile", + # Widgets + "DHIS2Widget", + "IASOWidget", + # Exceptions (re-exported for backward compat) + "InvalidParameterError", + "ParameterValueError", +] diff --git a/openhexa/sdk/pipelines/parameter/choices.py b/openhexa/sdk/pipelines/parameter/choices.py new file mode 100644 index 00000000..4f5e2ef6 --- /dev/null +++ b/openhexa/sdk/pipelines/parameter/choices.py @@ -0,0 +1,55 @@ +"""Dynamic choices classes for pipeline parameters.""" + +from openhexa.sdk.pipelines.exceptions import InvalidParameterError + +_SUPPORTED_FORMATS = {"csv", "json", "yaml", "yml"} + + +class ChoicesFromFile: + """Descriptor for choices loaded dynamically from a file in the workspace file system. + + The file format is inferred from the path extension (.csv, .json, .yaml, .yml). + For CSV files with a single column, that column is used automatically. + For CSV/JSON/YAML files with multiple columns/keys, `column` must be specified. + + Parameters + ---------- + path : str + Path to the file in the workspace file system (e.g. "data/districts.csv"). + column : str, optional + Column name (CSV) or key (JSON/YAML) to use as choice values. + Required when the file has more than one column/key. + """ + + def __init__(self, path: str, column: str | None = None): + self.path = path + self.column = column + self.format = self._detect_format(path) + self.validate_spec() + + @staticmethod + def _detect_format(path: str) -> str: + if not path or not isinstance(path, str): + raise InvalidParameterError("ChoicesFromFile path must be a non-empty string.") + ext = path.rsplit(".", 1)[-1].lower() if "." in path else "" + if ext not in _SUPPORTED_FORMATS: + raise InvalidParameterError( + f"Cannot determine file format from path '{path}'. " + f"Supported extensions: {', '.join(sorted(_SUPPORTED_FORMATS))}." + ) + return "yaml" if ext == "yml" else ext + + def validate_spec(self): + """Validate the path and column specification.""" + if not self.path or not isinstance(self.path, str): + raise InvalidParameterError("ChoicesFromFile path must be a non-empty string.") + if self.column is not None and not isinstance(self.column, str): + raise InvalidParameterError("ChoicesFromFile column must be a string.") + + def to_dict(self) -> dict: + """Return a dictionary representation of the choices spec.""" + return { + "format": self.format, + "path": self.path, + "column": self.column, + } diff --git a/openhexa/sdk/pipelines/parameter/decorator.py b/openhexa/sdk/pipelines/parameter/decorator.py new file mode 100644 index 00000000..f5a77b83 --- /dev/null +++ b/openhexa/sdk/pipelines/parameter/decorator.py @@ -0,0 +1,331 @@ +"""Parameter class, decorator, and validation logic for pipeline parameters.""" + +import typing + +from openhexa.sdk.datasets import Dataset +from openhexa.sdk.files import File +from openhexa.sdk.pipelines.exceptions import InvalidParameterError, ParameterValueError +from openhexa.sdk.pipelines.utils import validate_pipeline_parameter_code +from openhexa.sdk.workspaces.connection import ( + CustomConnection, + DHIS2Connection, + GCSConnection, + IASOConnection, + PostgreSQLConnection, + S3Connection, +) + +from .choices import ChoicesFromFile +from .types import TYPES_BY_PYTHON_TYPE, Boolean, DHIS2ConnectionType, IASOConnectionType, Secret +from .widgets import DHIS2Widget, IASOWidget + + +class Parameter: + """Pipeline parameter class. Contains validation logic specs generation logic.""" + + def __init__( + self, + code: str, + *, + type: type[ + str + | int + | bool + | float + | Secret + | DHIS2Connection + | IASOConnection + | PostgreSQLConnection + | GCSConnection + | S3Connection + | CustomConnection + | Dataset + | File + ], + name: str | None = None, + choices: typing.Sequence | ChoicesFromFile | None = None, + help: str | None = None, + default: typing.Any | None = None, + widget: DHIS2Widget | IASOWidget | None = None, + connection: str | None = None, + required: bool = True, + multiple: bool = False, + directory: str | None = None, + ): + validate_pipeline_parameter_code(code) + self.code = code + + try: + self.type = TYPES_BY_PYTHON_TYPE[type.__name__]() + except (KeyError, AttributeError): + valid_parameter_types = [k for k in TYPES_BY_PYTHON_TYPE.keys()] + raise InvalidParameterError( + f"Invalid parameter type provided ({type}). " + f"Valid parameter types are {', '.join(valid_parameter_types)}" + ) + + if choices is not None: + if not self.type.accepts_choices: + raise InvalidParameterError(f"Parameters of type {self.type} don't accept choices.") + if isinstance(choices, ChoicesFromFile): + # validate_spec() already ran in ChoicesFromFile.__init__; nothing more to check here + pass + else: + if len(choices) == 0: + raise InvalidParameterError("Choices, if provided, cannot be empty.") + try: + for choice in choices: + self.type.validate(choice) + except ParameterValueError: + raise InvalidParameterError( + f"The provided choices are not valid for the {self.type} parameter type." + ) + self.choices = choices + + self.name = name + self.help = help + self.required = required + + if multiple is True and not self.type.accepts_multiple: + raise InvalidParameterError(f"Parameters of type {self.type} can't have multiple values.") + self.multiple = multiple + + self.widget = widget + self.connection = connection + self.directory = directory + + self._validate_default(default, multiple) + self.default = default + + def validate(self, value: typing.Any) -> typing.Any: + """Validate the provided value against the parameter, taking required / default options into account.""" + if self.multiple: + return self._validate_multiple(value) + else: + return self._validate_single(value) + + def to_dict(self) -> dict[str, typing.Any]: + """Return a dictionary representation of the Parameter instance.""" + d = { + "code": self.code, + "type": self.type.spec_type, + "name": self.name, + "choices": None if isinstance(self.choices, ChoicesFromFile) else self.choices, + "help": self.help, + "default": self.default, + "widget": self.widget.value if self.widget else None, + "connection": self.connection, + "required": self.required, + "multiple": self.multiple, + "directory": self.directory, + } + if isinstance(self.choices, ChoicesFromFile): + d["choices_from_file"] = self.choices.to_dict() + return d + + def _validate_single(self, value: typing.Any): + # Normalize empty values to None and handles default + normalized_value = self.type.normalize(value) + if normalized_value is None and self.default is not None: + normalized_value = self.default + + if normalized_value is None: + if isinstance(self.type, Boolean): + normalized_value = False + elif self.required: + raise ParameterValueError(f"{self.code} is required") + else: + return None + + pre_validated = self.type.validate(normalized_value) + if ( + self.choices is not None + and not isinstance(self.choices, ChoicesFromFile) + and pre_validated not in self.choices + ): + raise ParameterValueError(f"The provided value for {self.code} is not included in the provided choices.") + + return pre_validated + + def _validate_multiple(self, value: typing.Any): + # Reject values that are not lists + if value is not None and not isinstance(value, list): + raise InvalidParameterError("If provided, value should be a list when parameter is multiple.") + + # Normalize empty values to an empty list + if value is None: + normalized_value = [] + else: + normalized_value = [self.type.normalize(v) for v in value] + normalized_value = list(filter(lambda v: v is not None, normalized_value)) + if len(normalized_value) == 0 and self.default is not None: + normalized_value = self.default + + if len(normalized_value) == 0 and self.required: + raise ParameterValueError(f"{self.code} is required") + + pre_validated = [self.type.validate(single_value) for single_value in normalized_value] + if ( + self.choices is not None + and not isinstance(self.choices, ChoicesFromFile) + and any(v not in self.choices for v in pre_validated) + ): + raise ParameterValueError( + f"One of the provided values for {self.code} is not included in the provided choices." + ) + + return pre_validated + + def _validate_default(self, default: typing.Any, multiple: bool): + if default is None: + return + + try: + if multiple: + if not isinstance(default, list): + raise InvalidParameterError("Default values should be lists when using multiple=True") + for default_value in default: + self.type.validate_default(default_value) + else: + self.type.validate_default(default) + except ParameterValueError: + raise InvalidParameterError(f"The default value for {self.code} is not valid.") + + if self.choices is not None and not isinstance(self.choices, ChoicesFromFile): + if isinstance(default, list): + if not all(d in self.choices for d in default): + raise InvalidParameterError( + f"The default list of values for {self.code} is not included in the provided choices." + ) + elif default not in self.choices: + raise InvalidParameterError( + f"The default value for {self.code} is not included in the provided choices." + ) + + +def validate_parameters(parameters: list[Parameter]): + """Validate the provided connection parameters if they relate to existing connection parameter.""" + supported_connection_types = {DHIS2ConnectionType, IASOConnectionType} + connection_parameters = {p.code for p in parameters if type(p.type) in supported_connection_types} + + for parameter in parameters: + if parameter.connection and parameter.connection not in connection_parameters: + raise InvalidParameterError( + f"Connection field '{parameter.code}' references a non-existing connection parameter '{parameter.connection}'" + ) + if ( + parameter.widget + and (parameter.widget in DHIS2Widget or parameter.widget in IASOWidget) + and not parameter.connection + ): + raise InvalidParameterError( + f"Widgets require a connection parameter. Please provide a connection parameter for {parameter.code}. " + f"Example: @parameter('my_connection', ...)" + f"Example: @parameter('{parameter.code}', widget = ..., connection='my_connection')" + ) + + +def parameter( + code: str, + *, + type: type[ + str + | int + | bool + | float + | Secret + | DHIS2Connection + | IASOConnection + | PostgreSQLConnection + | GCSConnection + | S3Connection + | CustomConnection + | Dataset + | File + ], + name: str | None = None, + choices: typing.Sequence | ChoicesFromFile | None = None, + help: str | None = None, + widget: DHIS2Widget | IASOWidget | None = None, + connection: str | None = None, + default: typing.Any | None = None, + required: bool = True, + multiple: bool = False, + directory: str | None = None, +): + """Decorate a pipeline function by attaching a parameter to it.. + + This decorator must be used on a function decorated by the @pipeline decorator. + + Parameters + ---------- + code : str + The parameter identifier (must be unique for a given pipeline) + type : {str, int, bool, float, DHIS2Connection, IASOConnection, PostgreSQLConnection, GCSConnection, S3Connection, CustomConnection, Dataset, File} + The parameter Python type + name : str, optional + A name for the parameter (will be used instead of the code in the web interface) + choices : list, optional + An optional list or tuple of choices for the parameter (will be used to build a choice widget in the web + interface) + help : str, optional + An optional help text to be displayed in the web interface + widget : DHIS2Widget|IASOWidget, optional + An optional widget type for the parameter (only used if the parameter type is DHIS2Connection, IASOConnection) + connection : str, optional + An optional connection parameter that will be used to link widget to the connection. + default : any, optional + An optional default value for the parameter (should be of the type defined by the type parameter) + required : bool, default=True + Whether the parameter is mandatory + multiple : bool, default=True + Whether this parameter should be provided multiple values (if True, the value must be provided as a list of + values of the chosen type) + directory : str, optional + An optional parameter to force file selection to specific directory (only used for parameter type File). If the directory does not exist, it will be ignored. + + Returns + ------- + typing.Callable + A decorator that returns the Pipeline with the parameter attached + + """ + + def decorator(fun): + return FunctionWithParameter( + fun, + Parameter( + code, + type=type, + name=name, + choices=choices, + help=help, + default=default, + required=required, + widget=widget, + connection=connection, + multiple=multiple, + directory=directory, + ), + ) + + return decorator + + +class FunctionWithParameter: + """Wrapper class for pipeline functions decorated with the @parameter decorator.""" + + def __init__(self, function, added_parameter: Parameter): + self.function = function + self.parameter = added_parameter + + def get_all_parameters(self) -> list[Parameter]: + """Go through the decorators chain to find all pipeline parameters.""" + if isinstance(self.function, FunctionWithParameter): + return [self.parameter, *self.function.get_all_parameters()] + + return [self.parameter] + + def __call__(self, *args, **kwargs): + """Call the decorated pipeline function.""" + return self.function(*args, **kwargs) diff --git a/openhexa/sdk/pipelines/parameter.py b/openhexa/sdk/pipelines/parameter/types.py similarity index 55% rename from openhexa/sdk/pipelines/parameter.py rename to openhexa/sdk/pipelines/parameter/types.py index 4e6938ed..ff06e39c 100644 --- a/openhexa/sdk/pipelines/parameter.py +++ b/openhexa/sdk/pipelines/parameter/types.py @@ -1,15 +1,10 @@ -"""Pipeline parameters classes and functions. - -See https://github.com/BLSQ/openhexa/wiki/Writing-OpenHEXA-pipelines#pipeline-parameters for more information. -""" +"""Parameter type classes for pipeline parameters.""" import typing -from enum import StrEnum from openhexa.sdk.datasets import Dataset from openhexa.sdk.files import File from openhexa.sdk.pipelines.exceptions import InvalidParameterError, ParameterValueError -from openhexa.sdk.pipelines.utils import validate_pipeline_parameter_code from openhexa.sdk.workspaces import workspace from openhexa.sdk.workspaces.connection import ( Connection, @@ -469,319 +464,3 @@ def validate_default(self, value: typing.Any | None): "Dataset": DatasetType, "File": FileType, } - - -class IASOWidget(StrEnum): - """Enum for IASO widgets.""" - - IASO_FORMS = "IASO_FORMS" - IASO_ORG_UNITS = "IASO_ORG_UNITS" - IASO_PROJECTS = "IASO_PROJECTS" - - -class DHIS2Widget(StrEnum): - """Enum for DHIS2 widgets.""" - - ORG_UNITS = "DHIS2_ORG_UNITS" - ORG_UNIT_GROUPS = "DHIS2_ORG_UNIT_GROUPS" - ORG_UNIT_LEVELS = "DHIS2_ORG_UNIT_LEVELS" - DATASETS = "DHIS2_DATASETS" - DATA_ELEMENTS = "DHIS2_DATA_ELEMENTS" - DATA_ELEMENT_GROUPS = "DHIS2_DATA_ELEMENT_GROUPS" - INDICATORS = "DHIS2_INDICATORS" - INDICATOR_GROUPS = "DHIS2_INDICATOR_GROUPS" - - -class Parameter: - """Pipeline parameter class. Contains validation logic specs generation logic.""" - - def __init__( - self, - code: str, - *, - type: type[ - str - | int - | bool - | float - | Secret - | DHIS2Connection - | IASOConnection - | PostgreSQLConnection - | GCSConnection - | S3Connection - | CustomConnection - | Dataset - | File - ], - name: str | None = None, - choices: typing.Sequence | None = None, - help: str | None = None, - default: typing.Any | None = None, - widget: DHIS2Widget | IASOWidget | None = None, - connection: str | None = None, - required: bool = True, - multiple: bool = False, - directory: str | None = None, - ): - validate_pipeline_parameter_code(code) - self.code = code - - try: - self.type = TYPES_BY_PYTHON_TYPE[type.__name__]() - except (KeyError, AttributeError): - valid_parameter_types = [k for k in TYPES_BY_PYTHON_TYPE.keys()] - raise InvalidParameterError( - f"Invalid parameter type provided ({type}). " - f"Valid parameter types are {', '.join(valid_parameter_types)}" - ) - - if choices is not None: - if not self.type.accepts_choices: - raise InvalidParameterError(f"Parameters of type {self.type} don't accept choices.") - elif len(choices) == 0: - raise InvalidParameterError("Choices, if provided, cannot be empty.") - - try: - for choice in choices: - self.type.validate(choice) - except ParameterValueError: - raise InvalidParameterError(f"The provided choices are not valid for the {self.type} parameter type.") - self.choices = choices - - self.name = name - self.help = help - self.required = required - - if multiple is True and not self.type.accepts_multiple: - raise InvalidParameterError(f"Parameters of type {self.type} can't have multiple values.") - self.multiple = multiple - - self.widget = widget - self.connection = connection - self.directory = directory - - self._validate_default(default, multiple) - self.default = default - - def validate(self, value: typing.Any) -> typing.Any: - """Validate the provided value against the parameter, taking required / default options into account.""" - if self.multiple: - return self._validate_multiple(value) - else: - return self._validate_single(value) - - def to_dict(self) -> dict[str, typing.Any]: - """Return a dictionary representation of the Parameter instance.""" - return { - "code": self.code, - "type": self.type.spec_type, - "name": self.name, - "choices": self.choices, - "help": self.help, - "default": self.default, - "widget": self.widget.value if self.widget else None, - "connection": self.connection, - "required": self.required, - "multiple": self.multiple, - "directory": self.directory, - } - - def _validate_single(self, value: typing.Any): - # Normalize empty values to None and handles default - normalized_value = self.type.normalize(value) - if normalized_value is None and self.default is not None: - normalized_value = self.default - - if normalized_value is None: - if isinstance(self.type, Boolean): - normalized_value = False - elif self.required: - raise ParameterValueError(f"{self.code} is required") - else: - return None - - pre_validated = self.type.validate(normalized_value) - if self.choices is not None and pre_validated not in self.choices: - raise ParameterValueError(f"The provided value for {self.code} is not included in the provided choices.") - - return pre_validated - - def _validate_multiple(self, value: typing.Any): - # Reject values that are not lists - if value is not None and not isinstance(value, list): - raise InvalidParameterError("If provided, value should be a list when parameter is multiple.") - - # Normalize empty values to an empty list - if value is None: - normalized_value = [] - else: - normalized_value = [self.type.normalize(v) for v in value] - normalized_value = list(filter(lambda v: v is not None, normalized_value)) - if len(normalized_value) == 0 and self.default is not None: - normalized_value = self.default - - if len(normalized_value) == 0 and self.required: - raise ParameterValueError(f"{self.code} is required") - - pre_validated = [self.type.validate(single_value) for single_value in normalized_value] - if self.choices is not None and any(v not in self.choices for v in pre_validated): - raise ParameterValueError( - f"One of the provided values for {self.code} is not included in the provided choices." - ) - - return pre_validated - - def _validate_default(self, default: typing.Any, multiple: bool): - if default is None: - return - - try: - if multiple: - if not isinstance(default, list): - raise InvalidParameterError("Default values should be lists when using multiple=True") - for default_value in default: - self.type.validate_default(default_value) - else: - self.type.validate_default(default) - except ParameterValueError: - raise InvalidParameterError(f"The default value for {self.code} is not valid.") - - if self.choices is not None: - if isinstance(default, list): - if not all(d in self.choices for d in default): - raise InvalidParameterError( - f"The default list of values for {self.code} is not included in the provided choices." - ) - elif default not in self.choices: - raise InvalidParameterError( - f"The default value for {self.code} is not included in the provided choices." - ) - - -def validate_parameters(parameters: list[Parameter]): - """Validate the provided connection parameters if they relate to existing connection parameter.""" - supported_connection_types = {DHIS2ConnectionType, IASOConnectionType} - connection_parameters = {p.code for p in parameters if type(p.type) in supported_connection_types} - - for parameter in parameters: - if parameter.connection and parameter.connection not in connection_parameters: - raise InvalidParameterError( - f"Connection field '{parameter.code}' references a non-existing connection parameter '{parameter.connection}'" - ) - if ( - parameter.widget - and (parameter.widget in DHIS2Widget or parameter.widget in IASOWidget) - and not parameter.connection - ): - raise InvalidParameterError( - f"Widgets require a connection parameter. Please provide a connection parameter for {parameter.code}. " - f"Example: @parameter('my_connection', ...)" - f"Example: @parameter('{parameter.code}', widget = ..., connection='my_connection')" - ) - - -def parameter( - code: str, - *, - type: type[ - str - | int - | bool - | float - | Secret - | DHIS2Connection - | IASOConnection - | PostgreSQLConnection - | GCSConnection - | S3Connection - | CustomConnection - | Dataset - | File - ], - name: str | None = None, - choices: typing.Sequence | None = None, - help: str | None = None, - widget: DHIS2Widget | IASOWidget | None = None, - connection: str | None = None, - default: typing.Any | None = None, - required: bool = True, - multiple: bool = False, - directory: str | None = None, -): - """Decorate a pipeline function by attaching a parameter to it.. - - This decorator must be used on a function decorated by the @pipeline decorator. - - Parameters - ---------- - code : str - The parameter identifier (must be unique for a given pipeline) - type : {str, int, bool, float, DHIS2Connection, IASOConnection, PostgreSQLConnection, GCSConnection, S3Connection, CustomConnection, Dataset, File} - The parameter Python type - name : str, optional - A name for the parameter (will be used instead of the code in the web interface) - choices : list, optional - An optional list or tuple of choices for the parameter (will be used to build a choice widget in the web - interface) - help : str, optional - An optional help text to be displayed in the web interface - widget : DHIS2Widget|IASOWidget, optional - An optional widget type for the parameter (only used if the parameter type is DHIS2Connection, IASOConnection) - connection : str, optional - An optional connection parameter that will be used to link widget to the connection. - default : any, optional - An optional default value for the parameter (should be of the type defined by the type parameter) - required : bool, default=True - Whether the parameter is mandatory - multiple : bool, default=True - Whether this parameter should be provided multiple values (if True, the value must be provided as a list of - values of the chosen type) - directory : str, optional - An optional parameter to force file selection to specific directory (only used for parameter type File). If the directory does not exist, it will be ignored. - - Returns - ------- - typing.Callable - A decorator that returns the Pipeline with the parameter attached - - """ - - def decorator(fun): - return FunctionWithParameter( - fun, - Parameter( - code, - type=type, - name=name, - choices=choices, - help=help, - default=default, - required=required, - widget=widget, - connection=connection, - multiple=multiple, - directory=directory, - ), - ) - - return decorator - - -class FunctionWithParameter: - """Wrapper class for pipeline functions decorated with the @parameter decorator.""" - - def __init__(self, function, added_parameter: Parameter): - self.function = function - self.parameter = added_parameter - - def get_all_parameters(self) -> list[Parameter]: - """Go through the decorators chain to find all pipeline parameters.""" - if isinstance(self.function, FunctionWithParameter): - return [self.parameter, *self.function.get_all_parameters()] - - return [self.parameter] - - def __call__(self, *args, **kwargs): - """Call the decorated pipeline function.""" - return self.function(*args, **kwargs) diff --git a/openhexa/sdk/pipelines/parameter/widgets.py b/openhexa/sdk/pipelines/parameter/widgets.py new file mode 100644 index 00000000..8ff6f1bf --- /dev/null +++ b/openhexa/sdk/pipelines/parameter/widgets.py @@ -0,0 +1,24 @@ +"""Widget enums for DHIS2 and IASO pipeline parameters.""" + +from enum import StrEnum + + +class IASOWidget(StrEnum): + """Enum for IASO widgets.""" + + IASO_FORMS = "IASO_FORMS" + IASO_ORG_UNITS = "IASO_ORG_UNITS" + IASO_PROJECTS = "IASO_PROJECTS" + + +class DHIS2Widget(StrEnum): + """Enum for DHIS2 widgets.""" + + ORG_UNITS = "DHIS2_ORG_UNITS" + ORG_UNIT_GROUPS = "DHIS2_ORG_UNIT_GROUPS" + ORG_UNIT_LEVELS = "DHIS2_ORG_UNIT_LEVELS" + DATASETS = "DHIS2_DATASETS" + DATA_ELEMENTS = "DHIS2_DATA_ELEMENTS" + DATA_ELEMENT_GROUPS = "DHIS2_DATA_ELEMENT_GROUPS" + INDICATORS = "DHIS2_INDICATORS" + INDICATOR_GROUPS = "DHIS2_INDICATOR_GROUPS" diff --git a/openhexa/sdk/pipelines/runtime.py b/openhexa/sdk/pipelines/runtime.py index c1e3195e..a1c89335 100644 --- a/openhexa/sdk/pipelines/runtime.py +++ b/openhexa/sdk/pipelines/runtime.py @@ -16,6 +16,7 @@ from openhexa.sdk.pipelines.exceptions import InvalidParameterError, PipelineNotFound from openhexa.sdk.pipelines.parameter import ( TYPES_BY_PYTHON_TYPE, + ChoicesFromFile, DHIS2Widget, IASOWidget, Parameter, @@ -172,6 +173,19 @@ def _get_decorator_arg_value(decorator: ast.Call, arg: Argument, index: int) -> return (keyword.value.id, True) elif isinstance(keyword.value, ast.List): return ([el.value for el in keyword.value.elts], True) + elif isinstance(keyword.value, ast.Call): + func = keyword.value.func + func_name = func.id if isinstance(func, ast.Name) else None + if func_name != "ChoicesFromFile": + raise ValueError(f"Unsupported call in choices argument: {func_name}") + # Extract positional arg (path) and keyword args (column, format override) + pos_args = [a.value for a in keyword.value.args if isinstance(a, ast.Constant)] + kw_args = { + kw.arg: kw.value.value for kw in keyword.value.keywords if isinstance(kw.value, ast.Constant) + } + if pos_args: + kw_args.setdefault("path", pos_args[0]) + return (ChoicesFromFile(**kw_args), True) elif isinstance(keyword.value, ast.Attribute): if keyword.value.attr in DHIS2Widget.__members__: return getattr(DHIS2Widget, keyword.value.attr), True @@ -287,7 +301,7 @@ def get_pipeline(pipeline_path: Path) -> Pipeline: Argument("code", [ast.Constant]), Argument("type", [ast.Name]), Argument("name", [ast.Constant]), - Argument("choices", [ast.List]), + Argument("choices", [ast.List, ast.Call]), Argument("help", [ast.Constant]), Argument("default", [ast.Constant, ast.List]), Argument("widget", [ast.Attribute]), diff --git a/tests/test_choices.py b/tests/test_choices.py new file mode 100644 index 00000000..e050371f --- /dev/null +++ b/tests/test_choices.py @@ -0,0 +1,189 @@ +"""Tests for ChoicesFromFile dynamic parameter choices.""" + +import tempfile +from unittest import TestCase + +import pytest + +from openhexa.sdk.pipelines.exceptions import InvalidParameterError +from openhexa.sdk.pipelines.parameter import ChoicesFromFile, Parameter, parameter +from openhexa.sdk.pipelines.runtime import get_pipeline + +# --------------------------------------------------------------------------- +# ChoicesFromFile construction +# --------------------------------------------------------------------------- + + +class TestChoicesFromFileConstruction: + def test_csv_auto_detected(self): + fc = ChoicesFromFile("districts.csv") + assert fc.format == "csv" + assert fc.path == "districts.csv" + assert fc.column is None + + def test_json_auto_detected(self): + fc = ChoicesFromFile("data/regions.json", column="code") + assert fc.format == "json" + assert fc.column == "code" + + def test_yaml_auto_detected(self): + assert ChoicesFromFile("list.yaml").format == "yaml" + + def test_yml_normalised_to_yaml(self): + assert ChoicesFromFile("list.yml").format == "yaml" + + def test_unsupported_extension_raises(self): + with pytest.raises(InvalidParameterError, match="Supported extensions"): + ChoicesFromFile("districts.xlsx") + + def test_no_extension_raises(self): + with pytest.raises(InvalidParameterError, match="Supported extensions"): + ChoicesFromFile("districts") + + def test_empty_path_raises(self): + with pytest.raises(InvalidParameterError): + ChoicesFromFile("") + + def test_non_string_column_raises(self): + with pytest.raises(InvalidParameterError): + ChoicesFromFile("districts.csv", column=42) + + def test_to_dict(self): + fc = ChoicesFromFile("data/districts.csv", column="code") + assert fc.to_dict() == {"format": "csv", "path": "data/districts.csv", "column": "code"} + + def test_to_dict_no_column(self): + fc = ChoicesFromFile("districts.csv") + assert fc.to_dict() == {"format": "csv", "path": "districts.csv", "column": None} + + +# --------------------------------------------------------------------------- +# Parameter integration +# --------------------------------------------------------------------------- + + +class TestParameterWithChoicesFromFile: + def test_accepts_file_choices(self): + p = Parameter(code="district", type=str, choices=ChoicesFromFile("districts.csv")) + assert isinstance(p.choices, ChoicesFromFile) + + def test_to_dict_emits_file_choices_key(self): + p = Parameter(code="district", type=str, choices=ChoicesFromFile("districts.csv", column="code")) + d = p.to_dict() + assert d["choices"] is None + assert d["choices_from_file"] == {"format": "csv", "path": "districts.csv", "column": "code"} + + def test_to_dict_no_file_choices_key_for_static_choices(self): + p = Parameter(code="country", type=str, choices=["UG", "KE"]) + d = p.to_dict() + assert d["choices"] == ["UG", "KE"] + assert "choices_from_file" not in d + + def test_rejects_file_choices_on_bool_type(self): + with pytest.raises(InvalidParameterError, match="don't accept choices"): + Parameter(code="flag", type=bool, choices=ChoicesFromFile("flags.csv")) + + def test_validate_single_skips_choices_check(self): + p = Parameter(code="district", type=str, choices=ChoicesFromFile("districts.csv")) + # Any string value passes — the platform validates against the resolved list + assert p.validate("any_value") == "any_value" + + def test_validate_multiple_skips_choices_check(self): + p = Parameter(code="district", type=str, choices=ChoicesFromFile("districts.csv"), multiple=True) + assert p.validate(["A", "B", "C"]) == ["A", "B", "C"] + + def test_default_not_validated_against_file_choices(self): + # Should not raise even though default isn't in any resolved list + p = Parameter(code="district", type=str, choices=ChoicesFromFile("districts.csv"), default="UNKNOWN") + assert p.default == "UNKNOWN" + + def test_decorator_with_file_choices(self): + @parameter(code="district", type=str, choices=ChoicesFromFile("districts.csv")) + def my_pipeline(district): + pass + + params = my_pipeline.get_all_parameters() + assert len(params) == 1 + assert isinstance(params[0].choices, ChoicesFromFile) + + +# --------------------------------------------------------------------------- +# AST round-trip +# --------------------------------------------------------------------------- + + +class TestAstChoicesFromFile(TestCase): + def _write_pipeline(self, tmpdir, param_line): + with open(f"{tmpdir}/pipeline.py", "w") as f: + f.write( + "\n".join( + [ + "from openhexa.sdk.pipelines import pipeline, parameter", + "from openhexa.sdk.pipelines.parameter import ChoicesFromFile", + "", + param_line, + "@pipeline(name='Test pipeline')", + "def test_pipeline(district):", + " pass", + ] + ) + ) + + def test_file_choices_csv_positional_path(self): + with tempfile.TemporaryDirectory() as tmpdir: + self._write_pipeline( + tmpdir, + "@parameter('district', type=str, choices=ChoicesFromFile('districts.csv'))", + ) + p = get_pipeline(tmpdir) + param_dict = p.to_dict()["parameters"][0] + assert param_dict["choices"] is None + assert param_dict["choices_from_file"] == {"format": "csv", "path": "districts.csv", "column": None} + + def test_file_choices_csv_with_column(self): + with tempfile.TemporaryDirectory() as tmpdir: + self._write_pipeline( + tmpdir, + "@parameter('district', type=str, choices=ChoicesFromFile('data/districts.csv', column='code'))", + ) + p = get_pipeline(tmpdir) + param_dict = p.to_dict()["parameters"][0] + assert param_dict["choices_from_file"] == {"format": "csv", "path": "data/districts.csv", "column": "code"} + + def test_file_choices_json(self): + with tempfile.TemporaryDirectory() as tmpdir: + self._write_pipeline( + tmpdir, + "@parameter('district', type=str, choices=ChoicesFromFile('regions.json', column='id'))", + ) + p = get_pipeline(tmpdir) + param_dict = p.to_dict()["parameters"][0] + assert param_dict["choices_from_file"]["format"] == "json" + + def test_file_choices_yaml(self): + with tempfile.TemporaryDirectory() as tmpdir: + self._write_pipeline( + tmpdir, + "@parameter('district', type=str, choices=ChoicesFromFile('list.yml'))", + ) + p = get_pipeline(tmpdir) + param_dict = p.to_dict()["parameters"][0] + assert param_dict["choices_from_file"]["format"] == "yaml" + + def test_unsupported_call_in_choices_raises(self): + with tempfile.TemporaryDirectory() as tmpdir: + with open(f"{tmpdir}/pipeline.py", "w") as f: + f.write( + "\n".join( + [ + "from openhexa.sdk.pipelines import pipeline, parameter", + "", + "@parameter('district', type=str, choices=dict(a=1))", + "@pipeline(name='Test pipeline')", + "def test_pipeline(district):", + " pass", + ] + ) + ) + with self.assertRaises(ValueError, msg="Unsupported call"): + get_pipeline(tmpdir)