diff --git a/.github/.platforms/generate_platforms.py b/.github/.platforms/generate_platforms.py index 98034044..fdcc05d5 100755 --- a/.github/.platforms/generate_platforms.py +++ b/.github/.platforms/generate_platforms.py @@ -9,9 +9,10 @@ import sys import tempfile import time +from collections.abc import Callable from dataclasses import dataclass from pathlib import Path -from typing import Callable, TypeVar +from typing import TypeVar # Configuration PYTHON_VERSIONS = ("3.10", "3.11", "3.12", "3.13", "3.14") diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9821c3fb..26d261af 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -58,10 +58,12 @@ repos: hooks: - id: isort -- repo: https://github.com/asottile/pyupgrade - rev: v3.19.1 - hooks: - - id: pyupgrade +- repo: https://github.com/asottile/pyupgrade + rev: v3.21.2 + hooks: + - id: pyupgrade + args: [--py310-plus] + - repo: https://github.com/zizmorcore/zizmor-pre-commit rev: v1.20.0 diff --git a/cloudsmith_cli/cli/commands/mcp.py b/cloudsmith_cli/cli/commands/mcp.py index 42053ff9..1cfd7e2b 100644 --- a/cloudsmith_cli/cli/commands/mcp.py +++ b/cloudsmith_cli/cli/commands/mcp.py @@ -5,7 +5,6 @@ import shutil import sys from pathlib import Path -from typing import Dict, List import click import json5 @@ -110,7 +109,7 @@ def list_groups(ctx, opts, mcp_server: server.DynamicMCPServer): print_groups(groups) -def print_tools(tool_list: Dict[str, OpenAPITool]): +def print_tools(tool_list: dict[str, OpenAPITool]): """Print tools as a table or output in another format.""" headers = [ @@ -138,7 +137,7 @@ def print_tools(tool_list: Dict[str, OpenAPITool]): utils.pretty_print_list_info(num_results=num_results, suffix=list_suffix) -def print_groups(group_list: Dict[str, List[str]]): +def print_groups(group_list: dict[str, list[str]]): """Print tool groups as a table or output in another format.""" headers = [ diff --git a/cloudsmith_cli/cli/commands/metadata.py b/cloudsmith_cli/cli/commands/metadata.py index ff0a01ce..93deb653 100644 --- a/cloudsmith_cli/cli/commands/metadata.py +++ b/cloudsmith_cli/cli/commands/metadata.py @@ -15,9 +15,13 @@ ) from ...core.api.packages import get_package_slug_perm as api_get_package_slug_perm from ...core.pagination import paginate_results -from ...core.version import get_version as get_cli_version from .. import command, decorators, utils, validators from ..exceptions import handle_api_exceptions +from ..metadata_common import ( + attach_metadata_options, + require_metadata_content_type, + resolve_metadata_content, +) from ..utils import maybe_spinner from .main import main @@ -30,11 +34,6 @@ ] -def _default_source_identity(): - """Return the default value for --source-identity.""" - return f"cloudsmith-cli@{get_cli_version()}" - - def _format_metadata_row(entry): return [ click.style(entry.get("slug_perm") or "", fg="cyan"), @@ -93,35 +92,6 @@ def _print_metadata_entry(opts, entry): click.echo(json.dumps(content, indent=2, sort_keys=True)) -def _load_content(content_file, inline_content, *, required): - """Resolve --file / --content into a parsed object. - - Enforces the XOR between the two sources. When `required` is True (used - by `add`), at least one source must be provided. When False (used by - `update`), a missing source means "do not change content". - """ - if content_file is not None and inline_content is not None: - raise click.UsageError("--file and --content are mutually exclusive.") - - if content_file is not None: - if content_file == "-": - raw, source = click.get_text_stream("stdin").read(), "stdin" - else: - with open(content_file, encoding="utf-8") as fh: - raw, source = fh.read(), "--file" - elif inline_content is not None: - raw, source = inline_content, "--content" - elif required: - raise click.UsageError("One of --file or --content is required.") - else: - return None - - try: - return json.loads(raw) - except ValueError as exc: - raise click.UsageError(f"Invalid JSON in {source}: {exc}") from exc - - @main.group(name="metadata", cls=command.AliasGroup) @decorators.common_cli_config_options @decorators.common_cli_output_options @@ -130,7 +100,7 @@ def _load_content(content_file, inline_content, *, required): @click.pass_context def metadata_(ctx, opts): # pylint: disable=unused-argument """ - Manage metadata attached to packages in a repository. + Manage package metadata. """ @@ -151,8 +121,9 @@ def metadata_(ctx, opts): # pylint: disable=unused-argument "source_kind", default=None, help=( - "Filter by metadata source kind. Accepts an integer or a name " - "(e.g. 'customer', 'third_party'). Ignored when METADATA_SLUG_PERM is given." + "Filter by source kind. Accepts an integer or name " + "(for example, 'customer' or 'third_party'). Ignored when " + "METADATA_SLUG_PERM is given." ), ) @click.option( @@ -160,8 +131,9 @@ def metadata_(ctx, opts): # pylint: disable=unused-argument "classification", default=None, help=( - "Filter by metadata classification. Accepts an integer or a name " - "(e.g. 'provenance', 'sbom'). Ignored when METADATA_SLUG_PERM is given." + "Filter by classification. Accepts an integer or name " + "(for example, 'provenance' or 'sbom'). Ignored when " + "METADATA_SLUG_PERM is given." ), ) @click.pass_context @@ -177,25 +149,25 @@ def list_metadata( classification, ): """ - List metadata entries attached to a package. + List package metadata. - OWNER/REPO/PACKAGE: identifies the target package. + OWNER/REPO/PACKAGE: target package. - METADATA_SLUG_PERM (optional): if given, fetch and display only that single - metadata entry. Pagination and filter flags are ignored. + METADATA_SLUG_PERM (optional): fetch one metadata entry. Pagination and + filters are ignored. \b Examples: - $ cloudsmith metadata list your-org/awesome-repo/better-pkg - $ cloudsmith metadata list your-org/awesome-repo/better-pkg --classification provenance - $ cloudsmith metadata list your-org/awesome-repo/better-pkg meta-slug-perm + $ cloudsmith metadata list example-org/example-repo/example-pkg + $ cloudsmith metadata list example-org/example-repo/example-pkg --classification provenance + $ cloudsmith metadata list example-org/example-repo/example-pkg meta-slug-perm """ owner, repo, package = owner_repo_package use_stderr = utils.should_use_stderr(opts) if metadata_slug_perm: _echo_action( - "Fetching metadata entry %(metadata)s for the '%(package)s' package ... " + "Fetching metadata %(metadata)s for %(package)s ... " % { "metadata": click.style(metadata_slug_perm, bold=True), "package": click.style(package, bold=True), @@ -203,7 +175,7 @@ def list_metadata( use_stderr, ) - context_msg = "Failed to fetch metadata for the package." + context_msg = "Could not fetch package metadata." with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): slug_perm = api_get_package_slug_perm( @@ -224,12 +196,12 @@ def list_metadata( raise click.UsageError(str(exc)) from exc _echo_action( - "Listing metadata for the '%(package)s' package ... " + "Listing metadata for %(package)s ... " % {"package": click.style(package, bold=True)}, use_stderr, ) - context_msg = "Failed to list metadata for the package." + context_msg = "Could not list package metadata." with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): slug_perm = api_get_package_slug_perm( @@ -264,7 +236,7 @@ def list_metadata( "content_type", required=True, help=( - "The content type of the metadata payload (e.g. 'application/json'). " + "Content type for metadata content (for example, 'application/json'). " "Content type is immutable after creation." ), ) @@ -279,21 +251,20 @@ def list_metadata( allow_dash=True, ), default=None, - help="Path to a JSON file containing the metadata content. Use '-' for stdin.", + help="Read metadata content from a JSON file. Use '-' for stdin.", ) @click.option( "--content", "inline_content", default=None, - help=("Inline JSON content for the metadata. Mutually exclusive with --file."), + help="Set metadata content from inline JSON. Cannot be used with --file.", ) @click.option( "--source-identity", "source_identity", default=None, help=( - "Identifier indicating where the metadata originated. " - "Defaults to 'cloudsmith-cli@'." + "Identifier for the metadata source. " "Defaults to 'cloudsmith-cli@'." ), ) @click.pass_context @@ -307,38 +278,53 @@ def add_metadata( source_identity, ): """ - Attach a new metadata entry to a package. + Attach metadata to a package. - OWNER/REPO/PACKAGE: identifies the target package. + OWNER/REPO/PACKAGE: target package. Exactly one of --file or --content must be provided. Content type is set on creation and cannot be changed. \b Examples: - $ cloudsmith metadata add your-org/awesome-repo/better-pkg \\ + $ cloudsmith metadata add example-org/example-repo/example-pkg \\ --content-type application/json \\ --content '{"foo": "bar"}' - $ cat payload.json | cloudsmith metadata add your-org/awesome-repo/better-pkg \\ + $ cat metadata.json | cloudsmith metadata add example-org/example-repo/example-pkg \\ --content-type application/json \\ --file - - $ cloudsmith metadata add your-org/awesome-repo/better-pkg \\ + $ cloudsmith metadata add example-org/example-repo/example-pkg \\ --content-type application/vnd.jfrog.buildinfo+json \\ --file buildinfo.json """ owner, repo, package = owner_repo_package use_stderr = utils.should_use_stderr(opts) - content = _load_content(content_file, inline_content, required=True) - source_identity = source_identity or _default_source_identity() + metadata = resolve_metadata_content( + content_file=content_file, + inline_content=inline_content, + required=True, + file_option_name="--file", + content_option_name="--content", + ) + require_metadata_content_type( + content_type=content_type, + content_provided=metadata.provided, + option_name="--content-type", + ) + metadata = attach_metadata_options( + metadata, + content_type=content_type, + source_identity=source_identity, + ) _echo_action( - "Attaching metadata to the '%(package)s' package ... " + "Attaching metadata to %(package)s ... " % {"package": click.style(package, bold=True)}, use_stderr, ) - context_msg = "Failed to attach metadata to the package." + context_msg = "Could not attach metadata." with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): slug_perm = api_get_package_slug_perm( @@ -346,9 +332,9 @@ def add_metadata( ) entry = api_create_metadata( slug_perm, - content=content, - content_type=content_type, - source_identity=source_identity, + content=metadata.content, + content_type=metadata.content_type, + source_identity=metadata.source_identity, ) click.secho("OK", fg="green", err=use_stderr) @@ -377,25 +363,22 @@ def add_metadata( allow_dash=True, ), default=None, - help=( - "Path to a JSON file containing replacement metadata content. " - "Use '-' for stdin." - ), + help="Read replacement metadata content from a JSON file. Use '-' for stdin.", ) @click.option( "--content", "inline_content", default=None, help=( - "Inline JSON replacement content for the metadata. Mutually exclusive " - "with --file." + "Set replacement metadata content from inline JSON. Cannot be used with " + "--file." ), ) @click.option( "--source-identity", "source_identity", default=None, - help="Update the source identity for the metadata entry.", + help="Update the metadata source identity.", ) @click.pass_context def update_metadata( @@ -408,40 +391,43 @@ def update_metadata( source_identity, ): """ - Update an existing metadata entry on a package. + Update package metadata. - OWNER/REPO/PACKAGE: identifies the target package. - METADATA_SLUG_PERM: the permanent slug of the metadata entry to update. + OWNER/REPO/PACKAGE: target package. + METADATA_SLUG_PERM: permanent slug for the metadata entry. Content type cannot be changed after creation. \b Examples: - $ cloudsmith metadata update your-org/awesome-repo/better-pkg meta-slug \\ + $ cloudsmith metadata update example-org/example-repo/example-pkg meta-slug \\ --content '{"foo": "baz"}' - $ cat payload.json | cloudsmith metadata update your-org/awesome-repo/better-pkg meta-slug \\ + $ cat metadata.json | cloudsmith metadata update example-org/example-repo/example-pkg meta-slug \\ --file - """ owner, repo, package = owner_repo_package use_stderr = utils.should_use_stderr(opts) - content = _load_content(content_file, inline_content, required=False) + metadata = resolve_metadata_content( + content_file=content_file, + inline_content=inline_content, + required=False, + file_option_name="--file", + content_option_name="--content", + ) - patch_kwargs = { - key: value - for key, value in ( - ("content", content), - ("source_identity", source_identity), - ) - if value is not None - } + patch_kwargs = {} + if metadata.provided: + patch_kwargs["content"] = metadata.content + if source_identity is not None: + patch_kwargs["source_identity"] = source_identity if not patch_kwargs: raise click.UsageError( "Nothing to update. Provide --file, --content, or --source-identity." ) _echo_action( - "Updating metadata entry %(metadata)s on the '%(package)s' package ... " + "Updating metadata %(metadata)s for %(package)s ... " % { "metadata": click.style(metadata_slug_perm, bold=True), "package": click.style(package, bold=True), @@ -449,7 +435,7 @@ def update_metadata( use_stderr, ) - context_msg = "Failed to update metadata on the package." + context_msg = "Could not update package metadata." with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): slug_perm = api_get_package_slug_perm( @@ -477,19 +463,19 @@ def update_metadata( "--yes", default=False, is_flag=True, - help="Skip confirmation prompts. Use with care.", + help="Skip confirmation prompt.", ) @click.pass_context def remove_metadata(ctx, opts, owner_repo_package, metadata_slug_perm, yes): """ - Remove a metadata entry from a package. + Remove package metadata. - OWNER/REPO/PACKAGE: identifies the target package. - METADATA_SLUG_PERM: the permanent slug of the metadata entry to delete. + OWNER/REPO/PACKAGE: target package. + METADATA_SLUG_PERM: permanent slug for the metadata entry. \b Example: - $ cloudsmith metadata remove your-org/awesome-repo/better-pkg meta-slug + $ cloudsmith metadata remove example-org/example-repo/example-pkg meta-slug """ owner, repo, package = owner_repo_package use_stderr = utils.should_use_stderr(opts) @@ -499,20 +485,16 @@ def remove_metadata(ctx, opts, owner_repo_package, metadata_slug_perm, yes): "package": click.style(package, bold=True), } - prompt = ( - "remove the %(metadata)s metadata entry from the %(package)s package" - % remove_args - ) + prompt = "remove metadata %(metadata)s from package %(package)s" % remove_args if not utils.confirm_operation(prompt, assume_yes=yes, err=use_stderr): return _echo_action( - "Removing metadata entry %(metadata)s from the '%(package)s' package ... " - % remove_args, + "Removing metadata %(metadata)s from %(package)s ... " % remove_args, use_stderr, ) - context_msg = "Failed to remove metadata from the package." + context_msg = "Could not remove package metadata." with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): slug_perm = api_get_package_slug_perm( @@ -528,6 +510,6 @@ def remove_metadata(ctx, opts, owner_repo_package, metadata_slug_perm, yes): click.echo() click.secho( - "Removed metadata entry %(slug)s." + "Metadata removed: %(slug)s." % {"slug": click.style(metadata_slug_perm, bold=True)} ) diff --git a/cloudsmith_cli/cli/commands/push.py b/cloudsmith_cli/cli/commands/push.py index 2cc3d663..ab7d0af1 100644 --- a/cloudsmith_cli/cli/commands/push.py +++ b/cloudsmith_cli/cli/commands/push.py @@ -1,7 +1,10 @@ """CLI/Commands - Push packages.""" +# pylint: disable=too-many-lines + import math import os +import shlex import time from datetime import datetime @@ -16,6 +19,10 @@ upload_file as api_upload_file, validate_request_file_upload, ) +from ...core.api.metadata import ( + create_metadata as api_create_metadata, + validate_metadata as api_validate_metadata, +) from ...core.api.packages import ( create_package as api_create_package, get_package_formats, @@ -24,10 +31,399 @@ ) from .. import command, decorators, utils, validators from ..exceptions import handle_api_exceptions +from ..metadata_common import ( + MetadataContentError, + ResolvedMetadata, + attach_metadata_options, + default_metadata_source_identity, + require_metadata_content_type, + resolve_metadata_content, + source_label_for, +) from ..types import ExpandPath from ..utils import maybe_spinner from .main import main +#: Env var that lets CI/CD wrappers (e.g. GHA) opt out of hard-failing the +#: push when push-time metadata attachment fails. Defaults to ``error`` so an +#: invalid metadata content aborts the upload (design requirement: metadata +#: pushes must surface failures by default). Set to ``0`` or ``warn`` to +#: downgrade failures to a warning and let the package upload regardless. +METADATA_FAILURE_MODE_ENV = "CLOUDSMITH_METADATA_FAILURE_MODE" +METADATA_FAILURE_MODE_WARN = {"0", "warn"} +#: Click option dest names for the push-time metadata flags. Used by the +#: push handler to split metadata flags off from the package-create payload +#: kwargs (the API client would otherwise reject the unknown keys). +METADATA_KWARG_NAMES = ( + "metadata_content_file", + "metadata_content", + "metadata_content_type", + "metadata_source_identity", +) + + +def _metadata_failure_is_warn(): + """Return True iff ``$CLOUDSMITH_METADATA_FAILURE_MODE`` downgrades failures. + + Single source of truth for the env-var parsing so the validation and + attach paths cannot drift (e.g. one accepting ``"false"`` and the + other not). + """ + mode = os.environ.get(METADATA_FAILURE_MODE_ENV, "error").strip().lower() + return mode in METADATA_FAILURE_MODE_WARN + + +def _metadata_content_failure_info(exc): + info = { + "status": "content_invalid", + "error": str(exc), + } + if getattr(exc, "source_label", None): + info["source"] = exc.source_label + return info + + +def _warn_metadata_failure(failure_info): + click.secho( + "Metadata content is invalid: %(error)s" % failure_info, + fg="yellow", + err=True, + ) + click.secho( + "Package upload will continue without metadata. " + f"Unset ${METADATA_FAILURE_MODE_ENV} (or set it to ``error``) to " + "fail the push instead.", + fg="yellow", + err=True, + ) + + +def resolve_push_metadata_options( + *, + metadata_content_file=None, + metadata_content=None, + metadata_content_type=None, + metadata_source_identity=None, +): + """Resolve push-time metadata flags once before package upload loops.""" + if metadata_content_file is not None and metadata_content is not None: + raise click.UsageError( + "--metadata-content-file and --metadata-content are mutually exclusive." + ) + + metadata_provided = ( + metadata_content_file is not None or metadata_content is not None + ) + if not metadata_provided: + if metadata_content_type or metadata_source_identity: + raise click.UsageError( + "Add --metadata-content-file or --metadata-content when using " + "--metadata-content-type or --metadata-source-identity." + ) + return ResolvedMetadata(provided=False, content=None), None + + require_metadata_content_type( + content_type=metadata_content_type, + content_provided=True, + option_name="--metadata-content-type", + ) + + try: + metadata = resolve_metadata_content( + content_file=metadata_content_file, + inline_content=metadata_content, + required=True, + file_option_name="--metadata-content-file", + content_option_name="--metadata-content", + ) + except MetadataContentError as exc: + if not _metadata_failure_is_warn(): + raise + + source_label = exc.source_label or source_label_for(metadata_content_file) + metadata = ResolvedMetadata( + provided=True, + content=None, + content_type=metadata_content_type, + source_identity=( + metadata_source_identity or default_metadata_source_identity() + ), + content_file=metadata_content_file, + source_label=source_label, + ) + return metadata, _metadata_content_failure_info(exc) + + return ( + attach_metadata_options( + metadata, + content_type=metadata_content_type, + source_identity=metadata_source_identity, + ), + None, + ) + + +def _handle_metadata_api_exception(ctx, opts, exc, context_msg, skip_errors=False): + """Route metadata API failures through the standard API exception handler.""" + with handle_api_exceptions( + ctx, + opts=opts, + context_msg=context_msg, + reraise_on_error=skip_errors, + ): + raise exc + + +def _print_metadata_retry_hint( + opts, + owner, + repo, + slug, + metadata_content_file, + cli_content_type, + cli_source_identity, + reason="attach_failed", +): + """Print a copy-paste ``cloudsmith metadata add`` line for failed attaches. + + Skipped in JSON output mode — the envelope already carries slugs and + failure context, so CI can reconstruct the command without text parsing. + Skipped for inline ``--metadata-content`` payloads, since they are not + safely reproducible as a single shell line (multi-line / quoting / size). + + ``reason`` distinguishes a transient/policy attach failure (``"attach_failed"``, + where retrying the same payload may succeed) from a pre-validation + failure (``"validation_failed"``, where the payload itself is broken and + must be fixed first). Wording changes accordingly. + """ + if utils.should_use_stderr(opts): + return + # Skip when no file path (inline ``--metadata-content``) or stdin ("-"), + # since neither is reproducible as a single shell line. + if not metadata_content_file or metadata_content_file == "-": + return + + parts = [ + f"cloudsmith metadata add {shlex.quote(f'{owner}/{repo}/{slug}')}", + f" --file {shlex.quote(metadata_content_file)}", + ] + if cli_source_identity: + parts.append(f" --source-identity {shlex.quote(cli_source_identity)}") + if cli_content_type: + parts.append(f" --content-type {shlex.quote(cli_content_type)}") + + if reason == "validation_failed": + heading = "Fix the metadata content, then run:" + else: + heading = "Run this command to attach metadata:" + + click.echo(err=True) + click.secho(heading, fg="yellow", err=True) + click.secho(" \\\n".join(parts), fg="yellow", err=True) + + +def validate_metadata_payload( + ctx, + opts, + content, + content_type, + source=None, + skip_errors=False, +): + """Validate metadata against ``POST /v2/metadata/validate/`` pre-upload. + + Runs before any file upload so a malformed payload does not produce an + orphan package. Returns ``None`` on success. Routes validation failure + through ``handle_api_exceptions`` by default so the push aborts before + any S3 traffic. + When ``$CLOUDSMITH_METADATA_FAILURE_MODE`` is ``warn``/``0`` it returns a + metadata-info dict instead so the caller can skip attachment but continue + the push. + + ``source`` is a human-readable label for the payload origin (file + basename, ``"stdin"``, ``"inline"``) — surfaced in the progress line so + users know which source is being validated. + """ + # pylint: disable=too-many-arguments + use_stderr = utils.should_use_stderr(opts) + + if source: + message = "Validating metadata content from {source} ... ".format( + source=click.style(source, bold=True), + ) + else: + message = "Validating metadata content ... " + + click.echo( + message, + nl=False, + err=use_stderr, + ) + + try: + with maybe_spinner(opts): + api_validate_metadata(content=content, content_type=content_type) + except ApiException as exc: + http_status = getattr(exc, "status", None) + detail = ( + getattr(exc, "detail", None) + or getattr(exc, "status_description", None) + or str(exc) + or "unknown error" + ) + + click.secho("FAILED", fg="red", err=use_stderr) + + message = ( + "Metadata content failed validation " + f"(HTTP {http_status if http_status is not None else '???'}): {detail}" + ) + failure_info = { + "status": "validation_failed", + "http_status": http_status, + "error": detail, + } + + if not _metadata_failure_is_warn(): + opts.push_metadata_info = failure_info + _handle_metadata_api_exception( + ctx, + opts, + exc, + context_msg=message, + skip_errors=skip_errors, + ) + + click.secho(message, fg="yellow", err=True) + click.secho( + "Package upload will continue without metadata. " + f"Unset ${METADATA_FAILURE_MODE_ENV} (or set it to ``error``) to " + "fail the push instead.", + fg="yellow", + err=True, + ) + return failure_info + + click.secho("OK", fg="green", err=use_stderr) + return None + + +def attach_metadata_to_package( + ctx, + opts, + owner, + repo, + slug, + slug_perm, + content, + content_type, + source_identity, + skip_errors=False, + metadata_content_file=None, + cli_content_type=None, + cli_source_identity=None, +): + """Attach a metadata entry to a freshly-created package. + + Failure is fatal by default: the API error is reported and the push + exits non-zero so CI/CD pipelines surface broken SBOM/BuildInfo uploads + instead of silently shipping a package without metadata. Wrappers that + explicitly want the legacy non-fatal behaviour can set + ``$CLOUDSMITH_METADATA_FAILURE_MODE`` to ``warn`` (or ``0``). + """ + # pylint: disable=too-many-arguments + use_stderr = utils.should_use_stderr(opts) + + click.echo( + "Attaching metadata to package %(slug)s ... " + % {"slug": click.style(slug_perm, bold=True)}, + nl=False, + err=use_stderr, + ) + + try: + with maybe_spinner(opts): + entry = api_create_metadata( + slug_perm, + content=content, + content_type=content_type, + source_identity=source_identity, + ) + except ApiException as exc: + click.secho("FAILED", fg="red", err=use_stderr) + + http_status = getattr(exc, "status", None) + detail = ( + getattr(exc, "detail", None) + or getattr(exc, "status_description", None) + or str(exc) + or "unknown error" + ) + message = ( + f"Could not attach metadata to package {slug_perm} " + f"(HTTP {http_status if http_status is not None else '???'}): {detail}" + ) + failure_info = { + "status": "attach_failed", + "http_status": http_status, + "error": detail, + } + + hint_kwargs = { + "opts": opts, + "owner": owner, + "repo": repo, + "slug": slug, + "metadata_content_file": metadata_content_file, + "cli_content_type": cli_content_type, + "cli_source_identity": cli_source_identity, + } + + if not _metadata_failure_is_warn(): + opts.push_metadata_info = failure_info + _print_metadata_retry_hint(**hint_kwargs) + _handle_metadata_api_exception( + ctx, + opts, + exc, + context_msg=message, + skip_errors=skip_errors, + ) + + click.secho(message, fg="yellow", err=True) + click.secho( + f"Package upload completed without metadata because " + f"${METADATA_FAILURE_MODE_ENV}=warn. Unset the env var " + "(or set it to ``error``) to fail the push instead.", + fg="yellow", + err=True, + ) + _print_metadata_retry_hint(**hint_kwargs) + return failure_info + + click.secho("OK", fg="green", err=use_stderr) + + metadata_slug_perm = (entry or {}).get("slug_perm") or "?" + package_path = "{owner}/{repo}/{slug}".format( + owner=click.style(owner, fg="magenta"), + repo=click.style(repo, fg="magenta"), + slug=click.style(slug, fg="green"), + ) + click.echo( + "Metadata attached: %(path)s/%(metadata)s" + % { + "path": package_path, + "metadata": click.style(metadata_slug_perm, bold=True), + }, + err=use_stderr, + ) + + return { + "status": "attached", + "slug_perm": (entry or {}).get("slug_perm"), + "entry": entry or None, + } + def validate_upload_file(ctx, opts, owner, repo, filepath, skip_errors): """Validate parameters for requesting a file upload.""" @@ -388,13 +784,39 @@ def upload_files_and_create_package( wait_interval, skip_errors, sync_attempts, + metadata_content_file=None, + metadata_content=None, + metadata_content_type=None, + metadata_source_identity=None, + metadata=None, + metadata_failure_info=None, **kwargs, ): """Upload package files and create a new package.""" - # pylint: disable=unused-argument + # pylint: disable=unused-argument,too-many-arguments,too-many-locals owner, repo = owner_repo - # 1. Validate package create parameters + # Reset push-time metadata state for this call. ``handle_api_exceptions`` + # consults this attribute to surface validation/attach context in the + # JSON error envelope; an unset value would leak prior state on retries. + opts.push_metadata_info = None + + # 0. Resolve push-time metadata before package work. The dynamic command + # handler resolves once for multi-file pushes so stdin is consumed once; + # direct callers can still pass the metadata flags for test coverage. + if metadata is None: + metadata, metadata_failure_info = resolve_push_metadata_options( + metadata_content_file=metadata_content_file, + metadata_content=metadata_content, + metadata_content_type=metadata_content_type, + metadata_source_identity=metadata_source_identity, + ) + + should_attach_metadata = metadata.provided and metadata_failure_info is None + + # 1. Validate package create parameters. This runs before the metadata + # pre-validation so a typo in --name/--version fails fast without + # burning a /v2/metadata/validate/ round-trip first. validate_create_package( ctx=ctx, opts=opts, @@ -405,6 +827,26 @@ def upload_files_and_create_package( **kwargs, ) + # 1b. Pre-validate metadata against the server-side schema endpoint so a + # malformed payload cannot produce an orphan package (the upload would + # succeed and only the attach would fail). + if metadata_failure_info is not None: + opts.push_metadata_info = metadata_failure_info + _warn_metadata_failure(metadata_failure_info) + elif should_attach_metadata: + validation_failure = validate_metadata_payload( + ctx=ctx, + opts=opts, + content=metadata.content, + content_type=metadata.content_type, + source=metadata.source_label, + skip_errors=skip_errors, + ) + if validation_failure is not None: + # Warn-mode validation failure: keep the push, drop the attach. + should_attach_metadata = False + opts.push_metadata_info = validation_failure + # 2. Validate file upload parameters md5_checksums = {} for k, v in kwargs.items(): @@ -484,10 +926,44 @@ def upload_files_and_create_package( **kwargs, ) + # 5. Attach push-time metadata, if provided AND it passed validation. + # Warn-mode metadata failures leave opts.push_metadata_info populated + # and should_attach_metadata=False; surface a retry hint now that we + # have the package slug. Skipped in JSON mode and for inline payloads. + if should_attach_metadata: + opts.push_metadata_info = attach_metadata_to_package( + ctx=ctx, + opts=opts, + owner=owner, + repo=repo, + slug=slug, + slug_perm=slug_perm, + content=metadata.content, + content_type=metadata.content_type, + source_identity=metadata.source_identity, + skip_errors=skip_errors, + metadata_content_file=metadata.content_file, + cli_content_type=metadata.content_type, + cli_source_identity=metadata_source_identity, + ) + elif metadata.provided: + # Metadata resolution/validation already warned the user; the payload + # is broken so a straight retry would fail. Use the "fix first" hint. + _print_metadata_retry_hint( + opts=opts, + owner=owner, + repo=repo, + slug=slug, + metadata_content_file=metadata.content_file, + cli_content_type=metadata.content_type, + cli_source_identity=metadata_source_identity, + reason="validation_failed", + ) + if no_wait_for_sync: return slug_perm, slug - # 5. (optionally) Wait for the package to synchronise + # 6. (optionally) Wait for the package to synchronise wait_for_package_sync( ctx=ctx, opts=opts, @@ -585,6 +1061,60 @@ def create_push_handlers(): is_flag=True, help="Execute in dry run mode (don't upload anything.)", ) + @click.option( + "--metadata-content-file", + "metadata_content_file", + type=click.Path( + exists=True, + dir_okay=False, + readable=True, + resolve_path=True, + allow_dash=True, + ), + default=None, + help=( + "Read metadata content from a JSON file " + "(for example, SBOM or BuildInfo). Use '-' for stdin. " + "Content must be a JSON object. " + "Mutually exclusive with --metadata-content. " + "Metadata failures abort the push by default; set " + "$CLOUDSMITH_METADATA_FAILURE_MODE=warn (or 0) to downgrade " + "to a warning and keep the package upload." + ), + ) + @click.option( + "--metadata-content", + "metadata_content", + default=None, + help=( + "Set metadata content from inline JSON. Content must be a " + "JSON object. " + "Mutually exclusive with --metadata-content-file. " + "Metadata failures abort the push by default; set " + "$CLOUDSMITH_METADATA_FAILURE_MODE=warn (or 0) to downgrade " + "to a warning and keep the package upload." + ), + ) + @click.option( + "--metadata-content-type", + "metadata_content_type", + default=None, + help=( + "Content type for metadata content " + "(for example, 'application/vnd.jfrog.buildinfo+json'). " + "Required when metadata content is supplied and determines " + "the schema used for validation." + ), + ) + @click.option( + "--metadata-source-identity", + "metadata_source_identity", + default=None, + help=( + "Identifier for the metadata source. " + "Defaults to 'cloudsmith-cli@'." + ), + ) @click.pass_context def push_handler(ctx, *args, **kwargs): """Handle upload for a specific package format.""" @@ -598,19 +1128,56 @@ def push_handler(ctx, *args, **kwargs): owner_repo = owner_repo[0:2] kwargs["owner_repo"] = owner_repo + # Metadata flags are not part of the package-create payload, so + # pop them and forward them as explicit kwargs so they don't leak + # into validate_create_package() / create_package(). + metadata_kwargs = { + key: kwargs.pop(key, None) for key in METADATA_KWARG_NAMES + } + package_files = kwargs.pop("package_file") if not isinstance(package_files, tuple): package_files = (package_files,) + # Reject multi-file push combined with metadata flags. A single + # metadata payload semantically belongs to one package; silently + # fanning it out across N packages (and validating + attaching it + # N times) is almost never what the user wants. Force them to + # push files individually with metadata, or drop the flags. + metadata_flags_set = any( + metadata_kwargs.get(k) for k in METADATA_KWARG_NAMES + ) + if len(package_files) > 1 and metadata_flags_set: + raise click.UsageError( + "Metadata flags (--metadata-content-file, --metadata-content, " + "--metadata-content-type, --metadata-source-identity) cannot " + "be combined with multiple package files. Push files " + "individually when attaching metadata." + ) + + metadata, metadata_failure_info = resolve_push_metadata_options( + **metadata_kwargs + ) + results = [] for package_file in package_files: kwargs["package_file"] = package_file try: click.echo(err=utils.should_use_stderr(opts)) - res = upload_files_and_create_package(ctx, *args, **kwargs) + res = upload_files_and_create_package( + ctx, + *args, + **kwargs, + **metadata_kwargs, + metadata=metadata, + metadata_failure_info=metadata_failure_info, + ) if res: - results.append(res) + # ``upload_files_and_create_package`` resets and then + # populates ``opts.push_metadata_info`` on every call, + # so reading it here always reflects this iteration. + results.append((res, opts.push_metadata_info)) except ApiException: click.secho( "Skipping error and moving on.", @@ -622,14 +1189,15 @@ def push_handler(ctx, *args, **kwargs): if utils.should_use_stderr(opts): data = [] - for slug_perm, slug in results: - data.append( - { - "slug_perm": slug_perm, - "slug": slug, - "status": "OK", # Assuming success if we got here - } - ) + for (slug_perm, slug), metadata_info in results: + entry = { + "slug_perm": slug_perm, + "slug": slug, + "status": "OK", # Assuming success if we got here + } + if metadata_info is not None: + entry["metadata_attachment"] = metadata_info + data.append(entry) if len(data) == 1: utils.maybe_print_as_json(opts, data[0]) diff --git a/cloudsmith_cli/cli/exceptions.py b/cloudsmith_cli/cli/exceptions.py index 162b8b8f..be12b733 100644 --- a/cloudsmith_cli/cli/exceptions.py +++ b/cloudsmith_cli/cli/exceptions.py @@ -45,6 +45,13 @@ def handle_api_exceptions( if fields: error_data["fields"] = fields + # Surface push-time metadata context (validation/attach result) + # in the same JSON envelope so a downstream package-create or + # sync failure does not lose the earlier metadata signal. + metadata_context = getattr(opts, "push_metadata_info", None) + if metadata_context is not None: + error_data["metadata_attachment"] = metadata_context + # Print to stdout import json diff --git a/cloudsmith_cli/cli/metadata_common.py b/cloudsmith_cli/cli/metadata_common.py new file mode 100644 index 00000000..243da46b --- /dev/null +++ b/cloudsmith_cli/cli/metadata_common.py @@ -0,0 +1,169 @@ +"""Shared CLI helpers for package metadata content.""" + +import json +import os +from dataclasses import dataclass, replace +from typing import Any + +import click + +from ..core.api.metadata import validate_metadata as api_validate_metadata +from ..core.version import get_version as get_cli_version +from .exceptions import handle_api_exceptions +from .utils import maybe_spinner + + +@dataclass(frozen=True) +class ResolvedMetadata: + """Metadata content resolved from CLI options.""" + + provided: bool + content: dict[str, Any] | None + content_type: str | None = None + source_identity: str | None = None + content_file: str | None = None + source_label: str | None = None + + +class MetadataContentError(click.ClickException): + """Raised when supplied metadata content is not a valid JSON object.""" + + def __init__(self, message, *, source_label=None): + super().__init__(message) + self.source_label = source_label + + +def default_metadata_source_identity() -> str: + """Return the default value for metadata source identity options.""" + return f"cloudsmith-cli@{get_cli_version()}" + + +def source_label_for(content_file): + """Return a human-readable label for a metadata content source.""" + if content_file == "-": + return "stdin" + if content_file: + return os.path.basename(content_file) + return "inline" + + +def _json_type_name(value): + if value is None: + return "null" + if isinstance(value, list): + return "array" + if isinstance(value, str): + return "string" + if isinstance(value, bool): + return "boolean" + if isinstance(value, (int, float)): + return "number" + return type(value).__name__ + + +def _parse_json_object(raw, source_label): + try: + content = json.loads(raw) + except ValueError as exc: + raise MetadataContentError( + f"Invalid JSON in {source_label}: {exc}", + source_label=source_label, + ) from exc + + if not isinstance(content, dict): + raise MetadataContentError( + "Metadata content must be a JSON object. Found " + f"{_json_type_name(content)}.", + source_label=source_label, + ) + + return content + + +def resolve_metadata_content( + *, + content_file: str | None, + inline_content: str | None, + required: bool, + file_option_name: str, + content_option_name: str, +) -> ResolvedMetadata: + """Resolve metadata content options into a parsed JSON object.""" + if content_file is not None and inline_content is not None: + raise click.UsageError( + f"{file_option_name} and {content_option_name} are mutually exclusive." + ) + + if content_file is not None: + source_label = source_label_for(content_file) + if content_file == "-": + raw = click.get_text_stream("stdin").read() + else: + with open(content_file, encoding="utf-8") as fh: + raw = fh.read() + elif inline_content is not None: + source_label = source_label_for(None) + raw = inline_content + elif required: + raise click.UsageError( + f"One of {file_option_name} or {content_option_name} is required." + ) + else: + return ResolvedMetadata(provided=False, content=None) + + return ResolvedMetadata( + provided=True, + content=_parse_json_object(raw, source_label), + content_file=content_file, + source_label=source_label, + ) + + +def require_metadata_content_type( + *, + content_type: str | None, + content_provided: bool, + option_name: str, +) -> None: + """Require content type when metadata content has been supplied.""" + if content_provided and not content_type: + raise click.UsageError( + f"{option_name} is required when metadata content is supplied." + ) + + +def attach_metadata_options( + metadata: ResolvedMetadata, + *, + content_type: str | None, + source_identity: str | None, +) -> ResolvedMetadata: + """Return a resolved payload with content type and source identity attached.""" + if not metadata.provided: + return metadata + + return replace( + metadata, + content_type=content_type, + source_identity=source_identity or default_metadata_source_identity(), + ) + + +def validate_metadata_payload_api( + *, + ctx, + opts, + content: dict[str, Any], + content_type: str, + context_msg: str, + reraise_on_error: bool = False, +) -> None: + """Validate metadata content through the metadata validation endpoint.""" + with handle_api_exceptions( + ctx, + opts=opts, + context_msg=context_msg, + reraise_on_error=reraise_on_error, + ): + with maybe_spinner(opts): + api_validate_metadata(content=content, content_type=content_type) diff --git a/cloudsmith_cli/cli/saml.py b/cloudsmith_cli/cli/saml.py index 450798ed..087b0d7e 100644 --- a/cloudsmith_cli/cli/saml.py +++ b/cloudsmith_cli/cli/saml.py @@ -49,7 +49,7 @@ def get_idp_url(api_host, owner, session): def exchange_2fa_token(api_host, two_factor_token, totp_token, session): exchange_data = {"two_factor_token": two_factor_token, "totp_token": totp_token} - exchange_url = "{api_host}/user/two-factor/".format(api_host=api_host) + exchange_url = f"{api_host}/user/two-factor/" headers = { "Authorization": "Bearer {two_factor_token}".format( @@ -82,11 +82,9 @@ def exchange_2fa_token(api_host, two_factor_token, totp_token, session): def refresh_access_token(api_host, access_token, refresh_token, session): data = {"refresh_token": refresh_token} - url = "{api_host}/user/refresh-token/".format(api_host=api_host) + url = f"{api_host}/user/refresh-token/" - headers = { - "Authorization": "Bearer {access_token}".format(access_token=access_token) - } + headers = {"Authorization": f"Bearer {access_token}"} response = session.post( url, diff --git a/cloudsmith_cli/cli/tests/commands/test_metadata.py b/cloudsmith_cli/cli/tests/commands/test_metadata.py index afb5d316..0491317c 100644 --- a/cloudsmith_cli/cli/tests/commands/test_metadata.py +++ b/cloudsmith_cli/cli/tests/commands/test_metadata.py @@ -41,16 +41,16 @@ def test_help_preserves_example_lines(self): self.assertEqual(result.exit_code, 0, msg=result.output) self.assertIn( - "$ cloudsmith metadata list your-org/awesome-repo/better-pkg\n", + "$ cloudsmith metadata list example-org/example-repo/example-pkg\n", result.output, ) self.assertIn( - "$ cloudsmith metadata list your-org/awesome-repo/better-pkg " + "$ cloudsmith metadata list example-org/example-repo/example-pkg " "--classification provenance\n", result.output, ) self.assertIn( - "$ cloudsmith metadata list your-org/awesome-repo/better-pkg meta-slug-perm\n", + "$ cloudsmith metadata list example-org/example-repo/example-pkg meta-slug-perm\n", result.output, ) @@ -59,12 +59,12 @@ def test_add_help_preserves_multiline_example(self): self.assertEqual(result.exit_code, 0, msg=result.output) self.assertIn( - "$ cloudsmith metadata add your-org/awesome-repo/better-pkg \\\n", + "$ cloudsmith metadata add example-org/example-repo/example-pkg \\\n", result.output, ) self.assertIn("--content-type application/json \\\n", result.output) self.assertIn('--content \'{"foo": "bar"}\'', result.output) - self.assertIn("cat payload.json | cloudsmith metadata add", result.output) + self.assertIn("cat metadata.json | cloudsmith metadata add", result.output) self.assertIn("--file -", result.output) self.assertIn("application/vnd.jfrog.buildinfo+json", result.output) self.assertIn("--file buildinfo.json", result.output) @@ -73,7 +73,7 @@ def test_update_help_preserves_stdin_example(self): result = self.runner.invoke(metadata_, ["update", "--help"]) self.assertEqual(result.exit_code, 0, msg=result.output) - self.assertIn("cat payload.json | cloudsmith metadata update", result.output) + self.assertIn("cat metadata.json | cloudsmith metadata update", result.output) self.assertIn("--file -", result.output) @@ -435,6 +435,29 @@ def test_add_invalid_stdin_json_is_usage_error(self, mock_resolve, mock_create): self.assertIn("invalid json in stdin", result.output.lower()) mock_create.assert_not_called() + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_add_rejects_non_object_content(self, mock_resolve, mock_create): + mock_resolve.return_value = "pkg-slug-perm" + + for raw in ("null", "[]"): + result = self.runner.invoke( + metadata_, + [ + "add", + "myorg/myrepo/mypkg", + "--content-type", + "application/json", + "--content", + raw, + ], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("json object", result.output.lower()) + + mock_create.assert_not_called() + @patch("cloudsmith_cli.cli.commands.metadata.api_create_metadata") @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") def test_add_uses_explicit_source_identity(self, mock_resolve, mock_create): @@ -574,6 +597,26 @@ def test_update_requires_some_field(self, mock_resolve, mock_update): self.assertIn("nothing to update", result.output.lower()) mock_update.assert_not_called() + @patch("cloudsmith_cli.cli.commands.metadata.api_update_metadata") + @patch("cloudsmith_cli.cli.commands.metadata.api_get_package_slug_perm") + def test_update_rejects_non_object_content(self, mock_resolve, mock_update): + mock_resolve.return_value = "pkg-slug-perm" + + result = self.runner.invoke( + metadata_, + [ + "update", + "myorg/myrepo/mypkg", + "meta-slug", + "--content", + "[]", + ], + ) + + self.assertNotEqual(result.exit_code, 0) + self.assertIn("json object", result.output.lower()) + mock_update.assert_not_called() + def test_update_rejects_content_type_flag(self): result = self.runner.invoke( metadata_, diff --git a/cloudsmith_cli/cli/tests/test_metadata_common.py b/cloudsmith_cli/cli/tests/test_metadata_common.py new file mode 100644 index 00000000..00d2b89c --- /dev/null +++ b/cloudsmith_cli/cli/tests/test_metadata_common.py @@ -0,0 +1,123 @@ +"""Tests for shared metadata CLI helpers.""" + +import io + +import click +import pytest + +from ..metadata_common import ( + attach_metadata_options, + default_metadata_source_identity, + require_metadata_content_type, + resolve_metadata_content, +) + + +def test_resolve_metadata_content_optional_missing_returns_not_provided(): + metadata = resolve_metadata_content( + content_file=None, + inline_content=None, + required=False, + file_option_name="--file", + content_option_name="--content", + ) + + assert metadata.provided is False + assert metadata.content is None + + +def test_resolve_metadata_content_required_missing_raises(): + with pytest.raises(click.UsageError, match="required"): + resolve_metadata_content( + content_file=None, + inline_content=None, + required=True, + file_option_name="--file", + content_option_name="--content", + ) + + +def test_resolve_metadata_content_rejects_both_sources(): + with pytest.raises(click.UsageError, match="mutually exclusive"): + resolve_metadata_content( + content_file="/tmp/payload.json", + inline_content="{}", + required=True, + file_option_name="--file", + content_option_name="--content", + ) + + +def test_resolve_metadata_content_valid_inline_object(): + metadata = resolve_metadata_content( + content_file=None, + inline_content='{"x": 1}', + required=True, + file_option_name="--file", + content_option_name="--content", + ) + + assert metadata.provided is True + assert metadata.content == {"x": 1} + assert metadata.source_label == "inline" + + +@pytest.mark.parametrize("raw", ["null", "[]", '"text"', "3", "true"]) +def test_resolve_metadata_content_rejects_non_objects(raw): + with pytest.raises(click.ClickException, match="JSON object"): + resolve_metadata_content( + content_file=None, + inline_content=raw, + required=True, + file_option_name="--file", + content_option_name="--content", + ) + + +def test_resolve_metadata_content_reads_stdin_once(): + stdin = io.StringIO('{"from": "stdin"}') + + with pytest.MonkeyPatch.context() as monkeypatch: + monkeypatch.setattr( + "cloudsmith_cli.cli.metadata_common.click.get_text_stream", + lambda name: stdin, + ) + metadata = resolve_metadata_content( + content_file="-", + inline_content=None, + required=True, + file_option_name="--file", + content_option_name="--content", + ) + + assert metadata.content == {"from": "stdin"} + assert metadata.source_label == "stdin" + assert stdin.read() == "" + + +def test_require_metadata_content_type_when_content_supplied(): + with pytest.raises(click.UsageError, match="--content-type"): + require_metadata_content_type( + content_type=None, + content_provided=True, + option_name="--content-type", + ) + + +def test_attach_metadata_options_defaults_source_identity(): + metadata = resolve_metadata_content( + content_file=None, + inline_content="{}", + required=True, + file_option_name="--file", + content_option_name="--content", + ) + + resolved = attach_metadata_options( + metadata, + content_type="application/json", + source_identity=None, + ) + + assert resolved.content_type == "application/json" + assert resolved.source_identity == default_metadata_source_identity() diff --git a/cloudsmith_cli/cli/tests/test_push.py b/cloudsmith_cli/cli/tests/test_push.py index 247e62f9..ea4b6a71 100644 --- a/cloudsmith_cli/cli/tests/test_push.py +++ b/cloudsmith_cli/cli/tests/test_push.py @@ -1,10 +1,26 @@ +# pylint: disable=too-many-lines +import json +import os +import tempfile import unittest +from types import SimpleNamespace from unittest.mock import MagicMock, patch -from ..commands.push import upload_files_and_create_package +import click +import pytest +from ...core.api.exceptions import ApiException +from ..commands.push import ( + _print_metadata_retry_hint, + attach_metadata_to_package, + resolve_push_metadata_options, + upload_files_and_create_package, + validate_metadata_payload, +) +from ..metadata_common import ResolvedMetadata -# pylint: disable=too-many-instance-attributes + +# pylint: disable=too-many-instance-attributes,too-many-public-methods class TestPush(unittest.TestCase): def setUp(self): self.mock_ctx = MagicMock() @@ -121,6 +137,944 @@ def test_upload_files_and_create_package(self): **create_package_kwargs, ) + def test_upload_files_and_create_package_with_metadata(self): + """Successful push with metadata creates package + metadata entry.""" + input_kwargs = { + "package_file": "package/file/path", + "name": "test_package", + "version": "1.0.0", + } + metadata_kwargs = { + "metadata_content": '{"git_sha": "abc123"}', + "metadata_content_type": "application/vnd.jfrog.buildinfo+json", + "metadata_source_identity": "github-actions@example", + } + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file" + ) as mock_validate_upload_file, + patch("cloudsmith_cli.cli.commands.push.upload_file") as mock_upload_file, + patch( + "cloudsmith_cli.cli.commands.push.create_package" + ) as mock_create_package, + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata" + ) as mock_create_metadata, + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + ): + mock_validate_upload_file.return_value = "checksum" + mock_upload_file.return_value = "package_file_identifier" + mock_create_package.return_value = ("slug-perm-abc", "test_package_slug") + mock_create_metadata.return_value = {"slug_perm": "meta-slug-xyz"} + + upload_files_and_create_package( + self.mock_ctx, + self.mock_opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + **input_kwargs, + **metadata_kwargs, + ) + + mock_create_metadata.assert_called_once_with( + "slug-perm-abc", + content={"git_sha": "abc123"}, + content_type="application/vnd.jfrog.buildinfo+json", + source_identity="github-actions@example", + ) + + def test_upload_files_and_create_package_with_json_null_metadata(self): + """Explicit JSON null is rejected before upload by default.""" + with ( + patch( + "cloudsmith_cli.cli.commands.push.validate_create_package" + ) as mock_validate_create_package, + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch("cloudsmith_cli.cli.commands.push.api_create_metadata"), + ): + with pytest.raises(click.ClickException, match="JSON object"): + upload_files_and_create_package( + self.mock_ctx, + MagicMock(spec=[]), + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content="null", + metadata_content_type="application/json", + ) + + mock_validate_create_package.assert_not_called() + + def test_upload_attach_publishes_metadata_info_to_opts(self): + """Attach result is published on opts.push_metadata_info for JSON output.""" + api_entry = { + "slug_perm": "meta-slug-xyz", + "content_type": "application/json", + "classification": "GENERIC", + "source_kind": "CUSTOMER", + "source_identity": "github-actions@demo", + "content": {"git_sha": "abc123"}, + } + + opts = MagicMock(spec=[]) + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + return_value=api_entry, + ), + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + ): + result = upload_files_and_create_package( + self.mock_ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content='{"git_sha": "abc123"}', + metadata_content_type="application/json", + ) + + assert result == ("slug-perm-abc", "test_package_slug") + assert opts.push_metadata_info == { + "status": "attached", + "slug_perm": "meta-slug-xyz", + "entry": api_entry, + } + + def test_upload_files_and_create_package_metadata_failure_warn_does_not_fail_push( + self, + ): + """With CLOUDSMITH_METADATA_FAILURE_MODE=warn the push survives a bad attach.""" + input_kwargs = { + "package_file": "package/file/path", + "name": "test_package", + "version": "1.0.0", + } + metadata_kwargs = { + "metadata_content": '{"git_sha": "abc123"}', + "metadata_content_type": "application/json", + } + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file" + ) as mock_validate_upload_file, + patch("cloudsmith_cli.cli.commands.push.upload_file") as mock_upload_file, + patch( + "cloudsmith_cli.cli.commands.push.create_package" + ) as mock_create_package, + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata" + ) as mock_create_metadata, + patch( + "cloudsmith_cli.cli.commands.push.wait_for_package_sync" + ) as mock_wait_for_sync, + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {"CLOUDSMITH_METADATA_FAILURE_MODE": "warn"}, + ), + ): + mock_validate_upload_file.return_value = "checksum" + mock_upload_file.return_value = "package_file_identifier" + mock_create_package.return_value = ("slug-perm-abc", "test_package_slug") + mock_create_metadata.side_effect = ApiException( + status=422, detail="Schema validation failed" + ) + + opts = MagicMock(spec=[]) + + # Push must complete without raising, returning the slug pair. + result = upload_files_and_create_package( + self.mock_ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + **input_kwargs, + **metadata_kwargs, + ) + + assert result == ("slug-perm-abc", "test_package_slug") + assert opts.push_metadata_info == { + "status": "attach_failed", + "http_status": 422, + "error": "Schema validation failed", + } + mock_create_metadata.assert_called_once() + # Sync wait still happens — push behaviour unchanged otherwise. + mock_wait_for_sync.assert_called_once() + + def test_upload_files_and_create_package_metadata_failure_default_aborts_push(self): + """Default failure mode (no env override) aborts the push on a bad attach.""" + input_kwargs = { + "package_file": "package/file/path", + "name": "test_package", + "version": "1.0.0", + } + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + side_effect=ApiException(status=422, detail="bad payload"), + ), + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {}, + clear=False, + ) as patched_env, + ): + patched_env.pop("CLOUDSMITH_METADATA_FAILURE_MODE", None) + opts = SimpleNamespace( + output=None, + push_metadata_info=None, + verbose=False, + debug=False, + api_key=None, + api_host=None, + ) + ctx = click.Context(click.Command("test")) + + with pytest.raises(click.exceptions.Exit) as exc_info: + upload_files_and_create_package( + ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + metadata_content='{"x": 1}', + metadata_content_type="application/json", + **input_kwargs, + ) + + assert exc_info.value.exit_code == 422 + assert opts.push_metadata_info == { + "status": "attach_failed", + "http_status": 422, + "error": "bad payload", + } + + def test_prevalidate_metadata_404_aborts_before_upload_by_default(self): + """A validation endpoint API error aborts before upload by default.""" + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ) as mock_validate_upload_file, + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ) as mock_upload_file, + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ) as mock_create_package, + patch( + "cloudsmith_cli.cli.commands.push.api_validate_metadata", + side_effect=ApiException(status=404, detail="Not Found"), + ), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + return_value={"slug_perm": "meta-slug-xyz"}, + ), + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + ): + opts = SimpleNamespace( + output=None, + push_metadata_info=None, + verbose=False, + debug=False, + api_key=None, + api_host=None, + ) + ctx = click.Context(click.Command("test")) + with pytest.raises(click.exceptions.Exit) as exc_info: + upload_files_and_create_package( + ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content='{"x": 1}', + metadata_content_type="application/json", + ) + + assert exc_info.value.exit_code == 404 + mock_validate_upload_file.assert_not_called() + mock_upload_file.assert_not_called() + mock_create_package.assert_not_called() + + def test_prevalidate_metadata_warn_skips_attach_but_uploads_package(self): + """Validation failure in warn mode drops attach but lets the package upload.""" + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch( + "cloudsmith_cli.cli.commands.push.api_validate_metadata", + side_effect=ApiException(status=422, detail="schema mismatch"), + ), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata" + ) as mock_create_metadata, + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {"CLOUDSMITH_METADATA_FAILURE_MODE": "warn"}, + ), + ): + opts = MagicMock(spec=[]) + result = upload_files_and_create_package( + self.mock_ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content='{"x": 1}', + metadata_content_type="application/json", + ) + + # Package created successfully, metadata attach call never reached. + assert result == ("slug-perm-abc", "test_package_slug") + assert opts.push_metadata_info == { + "status": "validation_failed", + "http_status": 422, + "error": "schema mismatch", + } + mock_create_metadata.assert_not_called() + + def test_local_metadata_content_warn_skips_attach_but_uploads_package(self): + """Local JSON object validation failure respects warn mode.""" + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch( + "cloudsmith_cli.cli.commands.push.api_validate_metadata" + ) as mock_validate_metadata, + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata" + ) as mock_create_metadata, + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {"CLOUDSMITH_METADATA_FAILURE_MODE": "warn"}, + ), + ): + opts = MagicMock(spec=[]) + result = upload_files_and_create_package( + self.mock_ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content="[]", + metadata_content_type="application/json", + ) + + assert result == ("slug-perm-abc", "test_package_slug") + assert opts.push_metadata_info["status"] == "content_invalid" + mock_validate_metadata.assert_not_called() + mock_create_metadata.assert_not_called() + + def test_prevalidate_metadata_default_aborts_before_upload(self): + """Validation failure aborts before any file upload by default.""" + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file" + ) as mock_validate_upload_file, + patch("cloudsmith_cli.cli.commands.push.upload_file") as mock_upload_file, + patch( + "cloudsmith_cli.cli.commands.push.create_package" + ) as mock_create_package, + patch( + "cloudsmith_cli.cli.commands.push.api_validate_metadata", + side_effect=ApiException(status=422, detail="schema mismatch"), + ), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata" + ) as mock_create_metadata, + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {}, + clear=False, + ) as patched_env, + ): + patched_env.pop("CLOUDSMITH_METADATA_FAILURE_MODE", None) + opts = SimpleNamespace( + output=None, + push_metadata_info=None, + verbose=False, + debug=False, + api_key=None, + api_host=None, + ) + ctx = click.Context(click.Command("test")) + + with pytest.raises(click.exceptions.Exit) as exc_info: + upload_files_and_create_package( + ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content='{"x": 1}', + metadata_content_type="application/json", + ) + + assert exc_info.value.exit_code == 422 + assert opts.push_metadata_info == { + "status": "validation_failed", + "http_status": 422, + "error": "schema mismatch", + } + + # No upload, no package, no attach. + mock_validate_upload_file.assert_not_called() + mock_upload_file.assert_not_called() + mock_create_package.assert_not_called() + mock_create_metadata.assert_not_called() + + def test_metadata_content_type_without_payload_errors(self): + """Setting only --metadata-content-type (no payload) is a usage error.""" + with pytest.raises(click.UsageError, match="Add --metadata-content"): + upload_files_and_create_package( + self.mock_ctx, + self.mock_opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + metadata_content_type="application/json", + package_file="path", + name="x", + version="1", + ) + + def test_metadata_source_identity_without_payload_errors(self): + """Setting only --metadata-source-identity is a usage error.""" + with pytest.raises(click.UsageError, match="Add --metadata-content"): + upload_files_and_create_package( + self.mock_ctx, + self.mock_opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + metadata_source_identity="ci@example", + package_file="path", + name="x", + version="1", + ) + + def test_metadata_content_without_content_type_errors(self): + """Metadata content requires --metadata-content-type.""" + with pytest.raises(click.UsageError, match="--metadata-content-type"): + upload_files_and_create_package( + self.mock_ctx, + self.mock_opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + metadata_content="{}", + package_file="path", + name="x", + version="1", + ) + + def test_resolve_push_metadata_options_inline_json(self): + metadata, failure = resolve_push_metadata_options( + metadata_content='{"k": "v"}', + metadata_content_type="application/json", + ) + + assert failure is None + assert metadata.content == {"k": "v"} + assert metadata.content_type == "application/json" + assert metadata.source_label == "inline" + + def test_resolve_push_metadata_options_json_null_rejected(self): + with pytest.raises(click.ClickException, match="JSON object"): + resolve_push_metadata_options( + metadata_content="null", + metadata_content_type="application/json", + ) + + def test_resolve_push_metadata_options_invalid_json_rejected(self): + with pytest.raises(click.ClickException, match="Invalid JSON"): + resolve_push_metadata_options( + metadata_content="not-json", + metadata_content_type="application/json", + ) + + def test_resolve_push_metadata_options_invalid_json_warn_returns_failure(self): + with patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {"CLOUDSMITH_METADATA_FAILURE_MODE": "warn"}, + ): + metadata, failure = resolve_push_metadata_options( + metadata_content="not-json", + metadata_content_type="application/json", + ) + + assert metadata.provided is True + assert metadata.content is None + assert failure["status"] == "content_invalid" + assert "Invalid JSON" in failure["error"] + + def test_resolve_push_metadata_options_neither_returns_not_provided(self): + metadata, failure = resolve_push_metadata_options() + + assert metadata.provided is False + assert metadata.content is None + assert failure is None + + def test_resolve_push_metadata_options_mutex(self): + with pytest.raises(click.UsageError, match="mutually exclusive"): + resolve_push_metadata_options( + metadata_content_file="/tmp/foo.json", + metadata_content='{"k": "v"}', + metadata_content_type="application/json", + ) + + def test_resolve_push_metadata_options_reads_stdin_via_dash(self): + """``--metadata-content-file -`` reads JSON from stdin once.""" + import io + + stdin = io.StringIO('{"git_sha": "abc123"}') + with patch( + "cloudsmith_cli.cli.metadata_common.click.get_text_stream", + return_value=stdin, + ): + metadata, failure = resolve_push_metadata_options( + metadata_content_file="-", + metadata_content_type="application/json", + ) + + assert failure is None + assert metadata.content == {"git_sha": "abc123"} + assert metadata.source_label == "stdin" + + def test_cached_stdin_metadata_payload_reused_across_uploads(self): + """The push command resolves ``-`` metadata before per-file uploads.""" + import io + + open_calls = [] + + def fake_get_text_stream(name): + assert name == "stdin" + open_calls.append(name) + return io.StringIO('{"git_sha": "abc123"}') + + metadata_kwargs = { + "metadata_content_file": "-", + "metadata_content": None, + "metadata_content_type": "application/json", + "metadata_source_identity": None, + } + + with patch( + "cloudsmith_cli.cli.metadata_common.click.get_text_stream", + side_effect=fake_get_text_stream, + ): + metadata, metadata_failure_info = resolve_push_metadata_options( + **metadata_kwargs + ) + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + side_effect=[ + ("slug-perm-abc", "test_package_slug"), + ("slug-perm-def", "test_package_slug_2"), + ], + ), + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + return_value={"slug_perm": "meta-slug-xyz"}, + ) as mock_create_metadata, + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + ): + for package_file in ("path-1", "path-2"): + upload_files_and_create_package( + self.mock_ctx, + MagicMock(spec=[]), + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file=package_file, + name="x", + version="1", + metadata=metadata, + metadata_failure_info=metadata_failure_info, + ) + + assert open_calls == ["stdin"] + assert mock_create_metadata.call_count == 2 + for call in mock_create_metadata.call_args_list: + assert call.kwargs["content"] == {"git_sha": "abc123"} + assert call.kwargs["content_type"] == "application/json" + + def test_metadata_content_file_round_trip(self): + """A JSON file given via --metadata-content-file reaches the API call.""" + payload = {"git_sha": "abc123", "build": 42} + fd, path = tempfile.mkstemp(suffix=".json") + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + json.dump(payload, fh) + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + return_value={"slug_perm": "meta-slug-xyz"}, + ) as mock_create_metadata, + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + ): + upload_files_and_create_package( + self.mock_ctx, + MagicMock(spec=[]), + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content_file=path, + metadata_content_type="application/json", + ) + + kwargs = mock_create_metadata.call_args.kwargs + assert kwargs["content"] == payload + finally: + os.unlink(path) + + def test_metadata_default_source_identity(self): + """Without --metadata-source-identity the default is cloudsmith-cli@.""" + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + return_value={"slug_perm": "meta-slug-xyz"}, + ) as mock_create_metadata, + patch( + "cloudsmith_cli.cli.metadata_common.get_cli_version", + return_value="9.9.9", + ), + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + ): + upload_files_and_create_package( + self.mock_ctx, + MagicMock(spec=[]), + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content='{"x": 1}', + metadata_content_type="application/json", + ) + + assert ( + mock_create_metadata.call_args.kwargs["source_identity"] + == "cloudsmith-cli@9.9.9" + ) + + def test_metadata_retry_hint_emitted_on_attach_warn_failure(self): + """Attach failure (warn mode, file payload) routes to the hint helper.""" + opts = SimpleNamespace(output=None, push_metadata_info=None) + metadata = ResolvedMetadata( + provided=True, + content={"x": 1}, + content_type="application/vnd.cyclonedx+json", + source_identity="ci@gha", + content_file="/tmp/sbom.json", + source_label="sbom.json", + ) + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch("cloudsmith_cli.cli.commands.push.api_validate_metadata"), + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + side_effect=ApiException(status=422, detail="bad payload"), + ), + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + patch("cloudsmith_cli.cli.commands.push._print_metadata_retry_hint") as spy, + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {"CLOUDSMITH_METADATA_FAILURE_MODE": "warn"}, + ), + ): + upload_files_and_create_package( + self.mock_ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content_type="application/vnd.cyclonedx+json", + metadata_source_identity="ci@gha", + metadata=metadata, + ) + + spy.assert_called_once() + kwargs = spy.call_args.kwargs + assert kwargs["owner"] == self.owner + assert kwargs["repo"] == self.repo + assert kwargs["slug"] == "test_package_slug" + assert kwargs["metadata_content_file"] == "/tmp/sbom.json" + assert kwargs["cli_content_type"] == "application/vnd.cyclonedx+json" + assert kwargs["cli_source_identity"] == "ci@gha" + + def test_metadata_retry_hint_emitted_on_validation_warn_failure(self): + """Pre-validation warn failure routes to the hint helper after create.""" + opts = SimpleNamespace(output=None, push_metadata_info=None) + metadata = ResolvedMetadata( + provided=True, + content={"x": 1}, + content_type="application/json", + source_identity="cloudsmith-cli@9.9.9", + content_file="/tmp/buildinfo.json", + source_label="buildinfo.json", + ) + + with ( + patch("cloudsmith_cli.cli.commands.push.validate_create_package"), + patch( + "cloudsmith_cli.cli.commands.push.validate_upload_file", + return_value="checksum", + ), + patch( + "cloudsmith_cli.cli.commands.push.upload_file", + return_value="package_file_identifier", + ), + patch( + "cloudsmith_cli.cli.commands.push.create_package", + return_value=("slug-perm-abc", "test_package_slug"), + ), + patch( + "cloudsmith_cli.cli.commands.push.api_validate_metadata", + side_effect=ApiException(status=422, detail="schema mismatch"), + ), + patch("cloudsmith_cli.cli.commands.push.api_create_metadata"), + patch("cloudsmith_cli.cli.commands.push.wait_for_package_sync"), + patch("cloudsmith_cli.cli.commands.push._print_metadata_retry_hint") as spy, + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {"CLOUDSMITH_METADATA_FAILURE_MODE": "warn"}, + ), + ): + upload_files_and_create_package( + self.mock_ctx, + opts, + self.package_type, + [self.owner, self.repo], + self.dry_run, + self.no_wait_for_sync, + self.wait_interval, + self.skip_errors, + self.sync_attempts, + package_file="path", + name="x", + version="1", + metadata_content_type="application/json", + metadata=metadata, + ) + + spy.assert_called_once() + kwargs = spy.call_args.kwargs + assert kwargs["slug"] == "test_package_slug" + assert kwargs["metadata_content_file"] == "/tmp/buildinfo.json" + def test_upload_files_and_create_package_extra_files(self): # Values passed in from the command line input_kwargs = { @@ -243,3 +1197,173 @@ def test_upload_files_and_create_package_extra_files(self): skip_errors=self.skip_errors, **create_package_kwargs, ) + + +# Plain pytest functions for hint output — capsys is not auto-injected into +# unittest.TestCase methods, so these live outside the class. + + +def _hint_opts(output=None): + return SimpleNamespace(output=output, push_metadata_info=None) + + +def _api_opts(output="json"): + return SimpleNamespace( + output=output, + push_metadata_info=None, + verbose=False, + debug=False, + api_key=None, + api_host=None, + ) + + +def test_validate_metadata_payload_json_failure_uses_api_error_envelope(capsys): + ctx = click.Context(click.Command("push")) + opts = _api_opts(output="json") + + with ( + patch( + "cloudsmith_cli.cli.commands.push.api_validate_metadata", + side_effect=ApiException(status=422, detail="bad payload"), + ), + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {}, + clear=False, + ) as patched_env, + ): + patched_env.pop("CLOUDSMITH_METADATA_FAILURE_MODE", None) + with pytest.raises(click.exceptions.Exit) as exc_info: + validate_metadata_payload( + ctx=ctx, + opts=opts, + content={"x": 1}, + content_type="application/json", + source="inline", + ) + + assert exc_info.value.exit_code == 422 + data = json.loads(capsys.readouterr().out) + assert data["detail"] == "bad payload" + assert data["help"]["context"].startswith("Metadata content failed validation") + assert data["metadata_attachment"] == { + "status": "validation_failed", + "http_status": 422, + "error": "bad payload", + } + + +def test_attach_metadata_json_failure_uses_api_error_envelope(capsys): + ctx = click.Context(click.Command("push")) + opts = _api_opts(output="pretty_json") + + with ( + patch( + "cloudsmith_cli.cli.commands.push.api_create_metadata", + side_effect=ApiException(status=422, detail="bad payload"), + ), + patch.dict( + "cloudsmith_cli.cli.commands.push.os.environ", + {}, + clear=False, + ) as patched_env, + ): + patched_env.pop("CLOUDSMITH_METADATA_FAILURE_MODE", None) + with pytest.raises(click.exceptions.Exit) as exc_info: + attach_metadata_to_package( + ctx=ctx, + opts=opts, + owner="acme", + repo="repo", + slug="hello-txt", + slug_perm="hello-txt-abc", + content={"x": 1}, + content_type="application/json", + source_identity="ci@example", + ) + + assert exc_info.value.exit_code == 422 + data = json.loads(capsys.readouterr().out) + assert data["detail"] == "bad payload" + assert data["help"]["context"].startswith("Could not attach metadata") + assert data["metadata_attachment"] == { + "status": "attach_failed", + "http_status": 422, + "error": "bad payload", + } + + +def test_print_metadata_retry_hint_emits_copy_paste_command(capsys): + _print_metadata_retry_hint( + opts=_hint_opts(), + owner="acme", + repo="repo", + slug="hello-txt-abc", + metadata_content_file="/tmp/sbom.json", + cli_content_type="application/vnd.cyclonedx+json", + cli_source_identity="ci@gha", + ) + err = capsys.readouterr().err + assert "Run this command to attach metadata:" in err + assert "cloudsmith metadata add acme/repo/hello-txt-abc" in err + assert "--file /tmp/sbom.json" in err + assert "--source-identity ci@gha" in err + assert "--content-type application/vnd.cyclonedx+json" in err + + +def test_print_metadata_retry_hint_omits_default_flags(capsys): + _print_metadata_retry_hint( + opts=_hint_opts(), + owner="acme", + repo="repo", + slug="hello-txt-abc", + metadata_content_file="/tmp/sbom.json", + cli_content_type=None, + cli_source_identity=None, + ) + err = capsys.readouterr().err + assert "cloudsmith metadata add acme/repo/hello-txt-abc" in err + assert "--file /tmp/sbom.json" in err + assert "--source-identity" not in err + assert "--content-type" not in err + + +def test_print_metadata_retry_hint_silent_for_inline_content(capsys): + _print_metadata_retry_hint( + opts=_hint_opts(), + owner="acme", + repo="repo", + slug="hello-txt-abc", + metadata_content_file=None, + cli_content_type=None, + cli_source_identity=None, + ) + assert capsys.readouterr().err == "" + + +def test_print_metadata_retry_hint_silent_for_stdin(capsys): + """Stdin payload (``-``) is not safely reproducible — suppress the hint.""" + _print_metadata_retry_hint( + opts=_hint_opts(), + owner="acme", + repo="repo", + slug="hello-txt-abc", + metadata_content_file="-", + cli_content_type=None, + cli_source_identity=None, + ) + assert capsys.readouterr().err == "" + + +def test_print_metadata_retry_hint_silent_in_json_mode(capsys): + _print_metadata_retry_hint( + opts=_hint_opts(output="json"), + owner="acme", + repo="repo", + slug="hello-txt-abc", + metadata_content_file="/tmp/sbom.json", + cli_content_type=None, + cli_source_identity=None, + ) + assert capsys.readouterr().err == "" diff --git a/cloudsmith_cli/core/api/init.py b/cloudsmith_cli/core/api/init.py index b6a856bc..407e29e4 100644 --- a/cloudsmith_cli/core/api/init.py +++ b/cloudsmith_cli/core/api/init.py @@ -1,7 +1,7 @@ """Cloudsmith API - Initialisation.""" import base64 -from typing import Type, TypeVar +from typing import TypeVar import click import cloudsmith_api @@ -128,7 +128,7 @@ def initialise_api( T = TypeVar("T") -def get_api_client(cls: Type[T]) -> T: +def get_api_client(cls: type[T]) -> T: """Get an API client (with configuration).""" config = cloudsmith_api.Configuration() client = cls() diff --git a/cloudsmith_cli/core/api/metadata.py b/cloudsmith_cli/core/api/metadata.py index 5091d6f1..f3df8a60 100644 --- a/cloudsmith_cli/core/api/metadata.py +++ b/cloudsmith_cli/core/api/metadata.py @@ -1,7 +1,7 @@ """API - Package metadata (v2) endpoints.""" import json -from typing import Any, Optional, Union +from typing import Any import cloudsmith_api @@ -136,10 +136,10 @@ def _response_json(response): def list_metadata( package_slug_perm: str, *, - source_kind: Optional[Union[int, str]] = None, - classification: Optional[Union[int, str]] = None, - page: Optional[int] = None, - page_size: Optional[int] = None, + source_kind: int | str | None = None, + classification: int | str | None = None, + page: int | None = None, + page_size: int | None = None, ): """List metadata entries attached to a package. @@ -216,7 +216,7 @@ def update_metadata( metadata_slug_perm: str, *, content: Any = None, - source_identity: Optional[str] = None, + source_identity: str | None = None, ): """Patch an existing customer-owned metadata entry. diff --git a/cloudsmith_cli/core/download.py b/cloudsmith_cli/core/download.py index b3638017..578131a3 100644 --- a/cloudsmith_cli/core/download.py +++ b/cloudsmith_cli/core/download.py @@ -3,7 +3,6 @@ import fnmatch import hashlib import os -from typing import Dict, List, Optional, Tuple import click import cloudsmith_api @@ -18,8 +17,8 @@ def resolve_auth( - opts, api_key_opt: Optional[str] = None -) -> Tuple[requests.Session, Dict[str, str], str]: + opts, api_key_opt: str | None = None +) -> tuple[requests.Session, dict[str, str], str]: """ Resolve authentication method and create session with appropriate headers. @@ -56,7 +55,7 @@ def resolve_auth( return session, headers, auth_source -def _matches_tag_filter(pkg: Dict, tag_filter: str) -> bool: +def _matches_tag_filter(pkg: dict, tag_filter: str) -> bool: """ Check if a package matches the tag filter. @@ -84,13 +83,13 @@ def _search_packages( repo: str, name: str, *, - version: Optional[str] = None, - format_filter: Optional[str] = None, - os_filter: Optional[str] = None, - arch_filter: Optional[str] = None, - tag_filter: Optional[str] = None, - filename_filter: Optional[str] = None, -) -> List[Dict]: + version: str | None = None, + format_filter: str | None = None, + os_filter: str | None = None, + arch_filter: str | None = None, + tag_filter: str | None = None, + filename_filter: str | None = None, +) -> list[dict]: """ Search for packages matching criteria, returning all matches. @@ -172,13 +171,13 @@ def resolve_all_packages( repo: str, name: str, *, - version: Optional[str] = None, - format_filter: Optional[str] = None, - os_filter: Optional[str] = None, - arch_filter: Optional[str] = None, - tag_filter: Optional[str] = None, - filename_filter: Optional[str] = None, -) -> List[Dict]: + version: str | None = None, + format_filter: str | None = None, + os_filter: str | None = None, + arch_filter: str | None = None, + tag_filter: str | None = None, + filename_filter: str | None = None, +) -> list[dict]: """ Find all packages matching the criteria. @@ -224,14 +223,14 @@ def resolve_package( repo: str, name: str, *, - version: Optional[str] = None, - format_filter: Optional[str] = None, - os_filter: Optional[str] = None, - arch_filter: Optional[str] = None, - tag_filter: Optional[str] = None, - filename_filter: Optional[str] = None, + version: str | None = None, + format_filter: str | None = None, + os_filter: str | None = None, + arch_filter: str | None = None, + tag_filter: str | None = None, + filename_filter: str | None = None, yes: bool = False, -) -> Dict: +) -> dict: """ Find a single package matching the criteria, handling multiple matches. @@ -295,7 +294,7 @@ def resolve_package( return best_package -def _display_multiple_packages(packages: List[Dict]) -> None: +def _display_multiple_packages(packages: list[dict]) -> None: """Display a table of multiple matching packages.""" click.echo("Multiple packages found:") click.echo() @@ -319,7 +318,7 @@ def _display_multiple_packages(packages: List[Dict]) -> None: click.echo() -def get_download_url(package: Dict) -> str: +def get_download_url(package: dict) -> str: """ Get the download URL for a package. @@ -346,7 +345,7 @@ def get_download_url(package: Dict) -> str: return download_url -def get_package_files(package: Dict) -> List[Dict]: +def get_package_files(package: dict) -> list[dict]: """ Get all downloadable files associated with a package. @@ -389,7 +388,7 @@ def get_package_files(package: Dict) -> List[Dict]: return downloadable_files -def get_package_detail(owner: str, repo: str, identifier: str) -> Dict: +def get_package_detail(owner: str, repo: str, identifier: str) -> dict: """ Get detailed package information including download URLs. @@ -417,7 +416,7 @@ def stream_download( # noqa: C901 outfile: str, session: requests.Session, *, - headers: Optional[Dict[str, str]] = None, + headers: dict[str, str] | None = None, overwrite: bool = False, quiet: bool = False, ) -> None: @@ -522,7 +521,7 @@ def stream_download( # noqa: C901 click.secho("⚠ Checksum mismatch", fg="yellow", err=True) -def _select_best_package(packages: List[Dict]) -> Dict: +def _select_best_package(packages: list[dict]) -> dict: """Select the best package from multiple matches.""" # Sort by version (desc) then by upload date (desc) diff --git a/cloudsmith_cli/core/mcp/data.py b/cloudsmith_cli/core/mcp/data.py index 459c1db1..99e1c43c 100644 --- a/cloudsmith_cli/core/mcp/data.py +++ b/cloudsmith_cli/core/mcp/data.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Any, Dict, Optional +from typing import Any @dataclass @@ -10,8 +10,8 @@ class OpenAPITool: description: str method: str path: str - parameters: Dict[str, Any] + parameters: dict[str, Any] base_url: str - query_filter: Optional[str] + query_filter: str | None is_destructive: bool = False is_read_only: bool = False diff --git a/cloudsmith_cli/core/mcp/server.py b/cloudsmith_cli/core/mcp/server.py index 88069fcb..1ff7ba6b 100644 --- a/cloudsmith_cli/core/mcp/server.py +++ b/cloudsmith_cli/core/mcp/server.py @@ -2,7 +2,7 @@ import copy import inspect import json -from typing import Any, Dict, List, Optional +from typing import Any, Optional from urllib import parse import cloudsmith_api @@ -97,7 +97,7 @@ async def list_tools(self) -> list[types.Tool]: return cleaned_tools - def _clean_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]: + def _clean_schema(self, schema: dict[str, Any]) -> dict[str, Any]: """Clean up schema by removing anyOf patterns and other complexities""" if not isinstance(schema, dict): return schema @@ -113,7 +113,7 @@ def _clean_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]: return cleaned - def _clean_property_schema(self, prop_schema: Dict[str, Any]) -> Dict[str, Any]: + def _clean_property_schema(self, prop_schema: dict[str, Any]) -> dict[str, Any]: """Clean individual property schema""" if not isinstance(prop_schema, dict): return prop_schema @@ -180,8 +180,8 @@ def __init__( use_toon=True, allow_destructive_tools=False, debug_mode=False, - allowed_tool_groups: Optional[List[str]] = None, - allowed_tools: Optional[List[str]] = None, + allowed_tool_groups: list[str] | None = None, + allowed_tools: list[str] | None = None, force_all_tools: bool = False, ): mcp_kwargs = {"log_level": "ERROR"} @@ -195,7 +195,7 @@ def __init__( self.allowed_tool_groups = set(allowed_tool_groups or []) self.allowed_tools = set(allowed_tools or []) self.force_all_tools = force_all_tools - self.tools: Dict[str, OpenAPITool] = {} + self.tools: dict[str, OpenAPITool] = {} self.spec = {} async def load_openapi_spec(self): @@ -214,7 +214,7 @@ async def load_openapi_spec(self): self.spec = response.json() await self._generate_tools_from_spec() - def _get_tool_groups(self, tool_name: str) -> List[str]: + def _get_tool_groups(self, tool_name: str) -> list[str]: """ Extract all hierarchical group names from a tool name, excluding action suffixes. @@ -411,7 +411,7 @@ def _get_additional_headers(self): return headers def _get_request_params( - self, url: str, tool: OpenAPITool, arguments: Dict[str, Any] + self, url: str, tool: OpenAPITool, arguments: dict[str, Any] ): """Get params to use for HTTP request based on tool arguments""" @@ -459,7 +459,7 @@ def _get_request_params( return url, query_params, body_params async def _execute_api_call( - self, tool: OpenAPITool, arguments: Dict[str, Any] + self, tool: OpenAPITool, arguments: dict[str, Any] ) -> str: """Execute an API call based on tool definition""" @@ -524,8 +524,8 @@ async def _execute_api_call( await http_client.aclose() def _extract_parameters_from_schema( - self, schema: Dict[str, Any], param_in: str = "body" - ) -> Dict[str, Any]: + self, schema: dict[str, Any], param_in: str = "body" + ) -> dict[str, Any]: """Extract individual parameters from a resolved schema object""" parameters = {} @@ -549,8 +549,8 @@ def _extract_parameters_from_schema( return parameters def _extract_request_body_parameters( - self, request_body: Dict[str, Any] - ) -> Dict[str, Any]: + self, request_body: dict[str, Any] + ) -> dict[str, Any]: """Extract parameters from OpenAPI 3.0 request body with $ref resolution""" parameters = {} @@ -574,7 +574,7 @@ def _extract_request_body_parameters( return parameters - def _extract_body_parameter(self, body_param: Dict[str, Any]) -> Dict[str, str]: + def _extract_body_parameter(self, body_param: dict[str, Any]) -> dict[str, str]: """Extract parameters from Swagger 2.0 body parameter with $ref resolution""" if "schema" not in body_param: @@ -589,10 +589,10 @@ def _create_tool_from_operation( self, method: str, path: str, - operation: Dict[str, Any], + operation: dict[str, Any], path_parameters: list, base_url: str, - ) -> Optional[OpenAPITool]: + ) -> OpenAPITool | None: """Create a tool definition from an OpenAPI operation""" # Generate operation ID @@ -677,7 +677,7 @@ def _create_tool_from_operation( ) def _format_enum_description( - self, enum_values: List[str], original_description: str + self, enum_values: list[str], original_description: str ) -> str: """Format enum values for better tool descriptions""" @@ -691,7 +691,7 @@ def _format_enum_description( return f"Allowed values:\n{enum_list}" - def _resolve_schema_ref(self, ref_string: str) -> Dict[str, Any]: + def _resolve_schema_ref(self, ref_string: str) -> dict[str, Any]: """ Resolve a $ref reference to its actual schema definition @@ -721,7 +721,7 @@ def _resolve_schema_ref(self, ref_string: str) -> Dict[str, Any]: return current - def _resolve_schema(self, schema: Dict[str, Any]) -> Dict[str, Any]: + def _resolve_schema(self, schema: dict[str, Any]) -> dict[str, Any]: """ Recursively resolve a schema, handling $ref references """ @@ -763,17 +763,17 @@ def run(self): except asyncio.CancelledError: print("Server shutdown requested") - def list_tools(self) -> Dict[str, OpenAPITool]: + def list_tools(self) -> dict[str, OpenAPITool]: """Initialize and return list of tools. Useful for debugging""" asyncio.run(self.load_openapi_spec()) return self.tools - def list_groups(self) -> Dict[str, List[str]]: + def list_groups(self) -> dict[str, list[str]]: """Initialize and return list of tool groups with their tools. Useful for debugging""" asyncio.run(self.load_openapi_spec()) # Build a mapping of group -> list of tools - groups: Dict[str, List[str]] = {} + groups: dict[str, list[str]] = {} for tool_name in self.tools: tool_groups = self._get_tool_groups(tool_name) diff --git a/cloudsmith_cli/core/pagination.py b/cloudsmith_cli/core/pagination.py index e7211879..73131ec8 100644 --- a/cloudsmith_cli/core/pagination.py +++ b/cloudsmith_cli/core/pagination.py @@ -1,6 +1,7 @@ """Core pagination utilities.""" -from typing import Any, Callable, List, Optional, Sequence, Tuple +from collections.abc import Callable, Sequence +from typing import Any MAX_PAGE_SIZE = 1000 @@ -80,12 +81,12 @@ def from_headers(cls, headers): def paginate_results( - api_function: Callable[..., Tuple[Sequence[Any], PageInfo]], + api_function: Callable[..., tuple[Sequence[Any], PageInfo]], page_all: bool, page: int, page_size: int = MAX_PAGE_SIZE, **kwargs: Any, -) -> Tuple[List[Any], PageInfo]: +) -> tuple[list[Any], PageInfo]: """Retrieve paginated results. Behaviour: @@ -108,9 +109,9 @@ def paginate_results( # rather than raising. Downstream pretty printers handle an invalid page_info gracefully. return list(results), page_info - all_results: List[Any] = [] + all_results: list[Any] = [] current_page = 1 - last_page_info: Optional[PageInfo] = None + last_page_info: PageInfo | None = None while True: page_results, last_page_info = api_function( page=current_page, page_size=MAX_PAGE_SIZE, **kwargs