Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ jobs:

- run:
name: "Lint: Check code style with flake8"
command: docker compose exec django flake8 src/
command: |
docker compose exec django flake8 src/
docker compose exec django flake8 compute_worker/
- run:
name: "Tests: Run unit/integration tests (excluding e2e)"
Expand Down
76 changes: 74 additions & 2 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from billiard.exceptions import SoftTimeLimitExceeded

from logs_loguru import configure_logging, colorize_run_args
from docker_image_update_checker import DockerImageStatus, DockerImageUpdateChecker

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -113,6 +114,11 @@ def to_bool(val):

WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "").strip()

# Docker image config
DOCKER_IMAGE_NAMESPACE = get("DOCKER_IMAGE_NAMESPACE", "codalab")
DOCKER_IMAGE_REPOSITORY = get("DOCKER_IMAGE_REPOSITORY", "codabench-compute-worker")
DOCKER_IMAGE_TAG = get("DOCKER_IMAGE_TAG", "latest")


# -----------------------------------------------
# Program Kind
Expand Down Expand Up @@ -302,12 +308,78 @@ def rewrite_bundle_url_if_needed(url):
return url


def check_docker_image_update():
"""
Compare local and remote compute worker Docker images and log the
synchronization status along with relevant image metadata.
"""
checker = DockerImageUpdateChecker(
namespace=Settings.DOCKER_IMAGE_NAMESPACE,
repository=Settings.DOCKER_IMAGE_REPOSITORY,
tag=Settings.DOCKER_IMAGE_TAG,
docker_base_url=Settings.CONTAINER_SOCKET
)
result = checker.compare_local_vs_remote_images()
status = result["status"]

log_level = logging.INFO

log_lines = [
"",
"=" * 60,
"DOCKER IMAGE UPDATE CHECK",
"=" * 60,
f"Image: {result.get('image_name')}",
]

remote = result.get("remote")
local = result.get("local")

if remote:
log_lines.append(f"Remote: digest={remote.get('digest')}, date={remote.get('date')}")

if local:
log_lines.append(f"Local: id={local.get('id')}, date={local.get('date')}")

log_lines.append("-" * 60)

if status == DockerImageStatus.UP_TO_DATE:
log_lines.append("Status: Local image is synchronized with remote")
log_level = logging.INFO

elif status == DockerImageStatus.BEHIND:
log_lines.append("Status: Local image is behind remote version. For better submission processing and to avoid any submission errors, fetch the latest image!")
log_level = logging.ERROR

elif status == DockerImageStatus.LOCAL_MISSING:
log_lines.append("Status: Local image is not present. Pull required")
log_level = logging.ERROR

elif status == DockerImageStatus.REMOTE_UNAVAILABLE:
log_lines.append("Status: Could not fetch remote image metadata")
log_level = logging.ERROR

elif status == "error":
log_lines.append(f"Status: Image check failed: {result.get('error')}")
log_level = logging.ERROR
else:
log_lines.append(f"Unknown image status: {status}")
log_level = logging.ERROR

log_lines.append("=" * 60)

logger.log(log_level, "\n".join(log_lines))


# -----------------------------------------------------------------------------
# The main compute worker entrypoint, this is how a job is ran at the highest
# level.
# -----------------------------------------------------------------------------
@shared_task(name="compute_worker_run")
def run_wrapper(run_args):
# Check for docker image update
check_docker_image_update()

# We need to convert the UUID given by celery into a byte like object otherwise things will break
run_args.update(secret=str(run_args["secret"]))
logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}")
Expand Down Expand Up @@ -338,7 +410,7 @@ def run_wrapper(run_args):
msg = "Submission failed. See logs for more details."
run._update_status(SubmissionStatus.FAILED, extra_information=msg)
raise
except Exception as e:
except Exception:
# Catch any exception to avoid getting stuck in Running status
run._update_status(SubmissionStatus.FAILED, extra_information=traceback.format_exc())
raise
Expand Down Expand Up @@ -1303,7 +1375,7 @@ def start(self):
}
# Cleanup containers
containers_to_kill = [
self.ingestion_container_name,
self.ingestion_container_name,
self.program_container_name
]
logger.debug(
Expand Down
116 changes: 116 additions & 0 deletions compute_worker/docker_image_update_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import json
import urllib.request
from datetime import datetime
import docker


class DockerImageStatus:
UP_TO_DATE = "up_to_date"
BEHIND = "behind"
NEWER_LOCAL = "newer_local"
DIFFERENT = "different"
LOCAL_MISSING = "local_missing"
REMOTE_UNAVAILABLE = "remote_unavailable"


class DockerImageUpdateChecker:

def __init__(self, namespace: str, repository: str, tag: str, docker_base_url):
self.image_name = f"{namespace}/{repository}:{tag}"
self.url = f"https://hub.docker.com/v2/namespaces/{namespace}/repositories/{repository}/tags/{tag}"
self.client = docker.APIClient(base_url=docker_base_url, version="auto")

@staticmethod
def _parse_datetime(value: str):
if not value:
return None

try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return None

@staticmethod
def _get_json(url: str):
try:
with urllib.request.urlopen(url) as response:
return json.loads(response.read().decode())
except Exception:
return None

def get_remote_info(self):

data = self._get_json(self.url)
if not data:
return None

return {
"digest": data.get("digest"),
"date": self._parse_datetime(
data.get("tag_last_pushed") or data.get("last_updated")
),
}

def get_local_info(self):
try:
image = self.client.inspect_image(self.image_name)
return {
"id": image.get("Id"),
"digests": image.get("RepoDigests", []),
"date": self._parse_datetime(image.get("Created")),
}
except docker.errors.DockerException:
return None

def _get_status(self, remote: dict, local: dict):
remote_digest = remote.get("digest")
local_digests = local.get("digests", [])

if remote_digest and any(
remote_digest in digest for digest in local_digests
):
return DockerImageStatus.UP_TO_DATE

remote_date = remote.get("date")
local_date = local.get("date")

if remote_date and local_date:
if remote_date > local_date:
return DockerImageStatus.BEHIND

if remote_date < local_date:
return DockerImageStatus.NEWER_LOCAL

return DockerImageStatus.DIFFERENT

def compare_local_vs_remote_images(self):

try:
remote = self.get_remote_info()
local = self.get_local_info()

if not remote:
return {
"status": DockerImageStatus.REMOTE_UNAVAILABLE,
"image_name": self.image_name,
}

if not local:
return {
"status": DockerImageStatus.LOCAL_MISSING,
"image_name": self.image_name,
"remote": remote,
}

return {
"status": self._get_status(remote, local),
"image_name": self.image_name,
"remote": remote,
"local": local,
}
except Exception as exc:
return {
"status": "error",
"image_name": self.image_name,
"error": str(exc),
}
Loading