diff --git a/.circleci/config.yml b/.circleci/config.yml index f17f9bace..c6978cead 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -66,7 +66,9 @@ jobs: - run: name: "Lint: Check code style with flake8" - command: docker compose exec django flake8 src/ + command: | + docker compose exec django flake8 src/ + docker compose exec django flake8 compute_worker/ - run: name: "Tests: Run unit/integration tests (excluding e2e)" diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 147ed2d2f..637c25154 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -30,6 +30,7 @@ from billiard.exceptions import SoftTimeLimitExceeded from logs_loguru import configure_logging, colorize_run_args +from docker_image_update_checker import DockerImageStatus, DockerImageUpdateChecker logger = logging.getLogger(__name__) @@ -113,6 +114,11 @@ def to_bool(val): WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "").strip() + # Docker image config + DOCKER_IMAGE_NAMESPACE = get("DOCKER_IMAGE_NAMESPACE", "codalab") + DOCKER_IMAGE_REPOSITORY = get("DOCKER_IMAGE_REPOSITORY", "codabench-compute-worker") + DOCKER_IMAGE_TAG = get("DOCKER_IMAGE_TAG", "latest") + # ----------------------------------------------- # Program Kind @@ -302,12 +308,78 @@ def rewrite_bundle_url_if_needed(url): return url +def check_docker_image_update(): + """ + Compare local and remote compute worker Docker images and log the + synchronization status along with relevant image metadata. + """ + checker = DockerImageUpdateChecker( + namespace=Settings.DOCKER_IMAGE_NAMESPACE, + repository=Settings.DOCKER_IMAGE_REPOSITORY, + tag=Settings.DOCKER_IMAGE_TAG, + docker_base_url=Settings.CONTAINER_SOCKET + ) + result = checker.compare_local_vs_remote_images() + status = result["status"] + + log_level = logging.INFO + + log_lines = [ + "", + "=" * 60, + "DOCKER IMAGE UPDATE CHECK", + "=" * 60, + f"Image: {result.get('image_name')}", + ] + + remote = result.get("remote") + local = result.get("local") + + if remote: + log_lines.append(f"Remote: digest={remote.get('digest')}, date={remote.get('date')}") + + if local: + log_lines.append(f"Local: id={local.get('id')}, date={local.get('date')}") + + log_lines.append("-" * 60) + + if status == DockerImageStatus.UP_TO_DATE: + log_lines.append("Status: Local image is synchronized with remote") + log_level = logging.INFO + + elif status == DockerImageStatus.BEHIND: + log_lines.append("Status: Local image is behind remote version. For better submission processing and to avoid any submission errors, fetch the latest image!") + log_level = logging.ERROR + + elif status == DockerImageStatus.LOCAL_MISSING: + log_lines.append("Status: Local image is not present. Pull required") + log_level = logging.ERROR + + elif status == DockerImageStatus.REMOTE_UNAVAILABLE: + log_lines.append("Status: Could not fetch remote image metadata") + log_level = logging.ERROR + + elif status == "error": + log_lines.append(f"Status: Image check failed: {result.get('error')}") + log_level = logging.ERROR + else: + log_lines.append(f"Unknown image status: {status}") + log_level = logging.ERROR + + log_lines.append("=" * 60) + + logger.log(log_level, "\n".join(log_lines)) + + # ----------------------------------------------------------------------------- # The main compute worker entrypoint, this is how a job is ran at the highest # level. # ----------------------------------------------------------------------------- @shared_task(name="compute_worker_run") def run_wrapper(run_args): + # Check for docker image update + check_docker_image_update() + # We need to convert the UUID given by celery into a byte like object otherwise things will break run_args.update(secret=str(run_args["secret"])) logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}") @@ -338,7 +410,7 @@ def run_wrapper(run_args): msg = "Submission failed. See logs for more details." run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise - except Exception as e: + except Exception: # Catch any exception to avoid getting stuck in Running status run._update_status(SubmissionStatus.FAILED, extra_information=traceback.format_exc()) raise @@ -1303,7 +1375,7 @@ def start(self): } # Cleanup containers containers_to_kill = [ - self.ingestion_container_name, + self.ingestion_container_name, self.program_container_name ] logger.debug( diff --git a/compute_worker/docker_image_update_checker.py b/compute_worker/docker_image_update_checker.py new file mode 100644 index 000000000..4982deb77 --- /dev/null +++ b/compute_worker/docker_image_update_checker.py @@ -0,0 +1,116 @@ +import json +import urllib.request +from datetime import datetime +import docker + + +class DockerImageStatus: + UP_TO_DATE = "up_to_date" + BEHIND = "behind" + NEWER_LOCAL = "newer_local" + DIFFERENT = "different" + LOCAL_MISSING = "local_missing" + REMOTE_UNAVAILABLE = "remote_unavailable" + + +class DockerImageUpdateChecker: + + def __init__(self, namespace: str, repository: str, tag: str, docker_base_url): + self.image_name = f"{namespace}/{repository}:{tag}" + self.url = f"https://hub.docker.com/v2/namespaces/{namespace}/repositories/{repository}/tags/{tag}" + self.client = docker.APIClient(base_url=docker_base_url, version="auto") + + @staticmethod + def _parse_datetime(value: str): + if not value: + return None + + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + + @staticmethod + def _get_json(url: str): + try: + with urllib.request.urlopen(url) as response: + return json.loads(response.read().decode()) + except Exception: + return None + + def get_remote_info(self): + + data = self._get_json(self.url) + if not data: + return None + + return { + "digest": data.get("digest"), + "date": self._parse_datetime( + data.get("tag_last_pushed") or data.get("last_updated") + ), + } + + def get_local_info(self): + try: + image = self.client.inspect_image(self.image_name) + return { + "id": image.get("Id"), + "digests": image.get("RepoDigests", []), + "date": self._parse_datetime(image.get("Created")), + } + except docker.errors.DockerException: + return None + + def _get_status(self, remote: dict, local: dict): + remote_digest = remote.get("digest") + local_digests = local.get("digests", []) + + if remote_digest and any( + remote_digest in digest for digest in local_digests + ): + return DockerImageStatus.UP_TO_DATE + + remote_date = remote.get("date") + local_date = local.get("date") + + if remote_date and local_date: + if remote_date > local_date: + return DockerImageStatus.BEHIND + + if remote_date < local_date: + return DockerImageStatus.NEWER_LOCAL + + return DockerImageStatus.DIFFERENT + + def compare_local_vs_remote_images(self): + + try: + remote = self.get_remote_info() + local = self.get_local_info() + + if not remote: + return { + "status": DockerImageStatus.REMOTE_UNAVAILABLE, + "image_name": self.image_name, + } + + if not local: + return { + "status": DockerImageStatus.LOCAL_MISSING, + "image_name": self.image_name, + "remote": remote, + } + + return { + "status": self._get_status(remote, local), + "image_name": self.image_name, + "remote": remote, + "local": local, + } + except Exception as exc: + return { + "status": "error", + "image_name": self.image_name, + "error": str(exc), + }