Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b5ae81d
feature ok / needs to be tested
IdirLISN Mar 31, 2026
660cbbf
feature deactivated by default + hiden behind a button by default + v…
IdirLISN Apr 2, 2026
1d780c3
worker monitoring button behind user menu
IdirLISN Apr 2, 2026
2e05184
files blacked for fixing the formatting issues
IdirLISN Apr 2, 2026
3ec1619
fixing synthax and format
IdirLISN Apr 2, 2026
a44c6a9
fixing synthax and format
IdirLISN Apr 2, 2026
c7e26bb
fixing synthax and format
IdirLISN Apr 2, 2026
fc7c14e
feature in progress
IdirLISN Apr 7, 2026
3f4cd58
new file for monitoring implentation imported into frontend
IdirLISN May 6, 2026
a88c95a
test monitoring private queues
IdirLISN May 7, 2026
72f304f
compute worker monitoring on private queues (amazing stuff)
IdirLISN May 7, 2026
3250545
test number 234
IdirLISN May 7, 2026
3335e8e
test number 238
IdirLISN May 7, 2026
b6ca107
test number 245
IdirLISN May 7, 2026
8e611db
test number 246
IdirLISN May 7, 2026
3b3b521
test number 249
IdirLISN May 7, 2026
84a0a63
test number 435
IdirLISN May 7, 2026
46a758d
add d'un fichier manquant pour test
IdirLISN May 7, 2026
ae9f0ee
model queue = a problem
IdirLISN May 7, 2026
713bec9
clean feature/ needs to be tested
IdirLISN May 7, 2026
b8f8bdf
clean feature code and imports
IdirLISN May 11, 2026
90c5efc
feature production ready
IdirLISN May 11, 2026
8139621
fix comment // feature production ready
IdirLISN May 11, 2026
369905a
debug site worker
IdirLISN May 11, 2026
dfd9432
add collapse button on default worker panel
IdirLISN May 12, 2026
cbd925c
feature production ready
IdirLISN May 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ dependencies = [
"django-cors-headers==4.9.0",
"nh3==0.3.3",
"configobj==5.0.9",
"black>=26.3.1",
"redis-cli>=1.0.1",
]

[tool.uv]
Expand Down
118 changes: 117 additions & 1 deletion src/apps/competitions/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import os
import re
import traceback
Expand All @@ -8,10 +9,13 @@
from io import BytesIO
from tempfile import TemporaryDirectory, NamedTemporaryFile

import urllib

import oyaml as yaml
import requests
from celery._state import app_or_default
from django.conf import settings
from django_redis import get_redis_connection
from django.core.exceptions import ObjectDoesNotExist
from django.core.files.base import ContentFile
from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F
Expand All @@ -20,9 +24,10 @@
from django.utils.timezone import now
from rest_framework.exceptions import ValidationError

from celery_config import app
from celery_config import app, app_for_vhost
from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \
CompetitionDump, Phase
from queues.models import Queue
from competitions.unpackers.utils import CompetitionUnpackingException
from competitions.unpackers.v1 import V15Unpacker
from competitions.unpackers.v2 import V2Unpacker
Expand All @@ -31,8 +36,12 @@
from datasets.models import Data
from utils.data import make_url_sassy
from utils.email import codalab_send_markdown_email
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

import logging

from utils.worker_utils import WORKER_HEARTBEAT_TTL, WORKERS_REGISTRY_KEY, extract_queue_names, is_compute_worker, known_compute_queue_names
logger = logging.getLogger(__name__)

COMPETITION_FIELDS = [
Expand Down Expand Up @@ -790,3 +799,110 @@ def submission_status_cleanup():
sub.parent.cancel(status=Submission.FAILED)
else:
sub.cancel(status=Submission.FAILED)


# -------------------------------------------------
def _broadcast_worker_state(payload):
channel_layer = get_channel_layer()
if not channel_layer:
return

async_to_sync(channel_layer.group_send)(
"compute_workers",
{
"type": "worker.health",
"worker": payload,
},
)


@app.task(queue="site-worker", soft_time_limit=120)
def refresh_compute_worker_health():
celery_app = app
r = get_redis_connection("default")
known_queue_names = known_compute_queue_names()
broker_sources = []
broker_sources.append(("default", celery_app.conf.broker_url, celery_app))

private_queues = (
Queue.objects.filter(competitions__isnull=False)
.exclude(name__isnull=True)
.exclude(name="")
.distinct()
)
for queue in private_queues:
if not queue.broker_url:
continue
parsed = urllib.parse.urlparse(queue.broker_url)
vhost = parsed.path
broker_url = urllib.parse.urljoin(celery_app.conf.broker_url, vhost)
broker_sources.append((queue.name, broker_url, app_for_vhost(vhost)))

inspected_brokers = set()
for source_name, broker_url, broker_app in broker_sources:
if broker_url in inspected_brokers:
continue
inspected_brokers.add(broker_url)

try:
# timeout=5 : 4 appels × 5s × N brokers
inspector = broker_app.control.inspect(timeout=5)
if inspector is None:
logger.warning(
"Celery inspect returned None for broker=%s", source_name
)
continue
stats = inspector.stats() or {}
active = inspector.active() or {}
reserved = inspector.reserved() or {}
active_queues = inspector.active_queues() or {}
except Exception:
logger.exception(
"Unable to inspect Celery workers for broker %s", source_name
)
continue

for worker_name in stats.keys():
queues = active_queues.get(worker_name, []) or []
queue_names = extract_queue_names(queues)
if not is_compute_worker(worker_name, queue_names, known_queue_names):
continue

running_jobs = len(active.get(worker_name, [])) + len(
reserved.get(worker_name, [])
)
status = "busy" if running_jobs > 0 else "available"
payload = {
"hostname": worker_name,
"status": status,
"running_jobs": running_jobs,
"timestamp": now().timestamp(),
"queue_source": source_name,
"queue_names": sorted(queue_names),
}
heartbeat_key = f"worker:{source_name}:{worker_name}:heartbeat"
r.set(heartbeat_key, json.dumps(payload), ex=WORKER_HEARTBEAT_TTL)
r.hset(
WORKERS_REGISTRY_KEY,
f"{source_name}:{worker_name}",
json.dumps(
{
"hostname": worker_name,
"status": status,
"running_jobs": running_jobs,
"last_seen": payload["timestamp"],
"queue_source": source_name,
"queue_names": sorted(queue_names),
}
),
)
_broadcast_worker_state(payload)
# Logs about CW health HERE
# logger.info(
# "[WORKER-HEALTH] source=%s worker=%s status=%s jobs=%d queues=%s",
# source_name,
# worker_name,
# status,
# running_jobs,
# sorted(queue_names),
# )
18 changes: 12 additions & 6 deletions src/celery_config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import copy
import urllib.parse

from celery import Celery
from kombu import Queue, Exchange
from django.conf import settings
import urllib.parse
import copy
from kombu import Exchange, Queue

app = Celery()

from django.conf import settings # noqa

app.config_from_object('django.conf:settings', namespace='CELERY')
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.task_queues = [
# Mostly defining queue here so we can set x-max-priority
Queue('compute-worker', Exchange('compute-worker'), routing_key='compute-worker', queue_arguments={'x-max-priority': 10}),
Queue(
"compute-worker",
Exchange("compute-worker"),
routing_key="compute-worker",
queue_arguments={"x-max-priority": 10},
),
]

_vhost_apps = {}
Expand All @@ -32,7 +38,7 @@ def app_for_vhost(vhost):
# Copy the settings so we can modify the broker url to include the vhost
django_settings = copy.copy(settings)
django_settings.CELERY_BROKER_URL = broker_url
vhost_app.config_from_object(django_settings, namespace='CELERY')
vhost_app.config_from_object(django_settings, namespace="CELERY")
vhost_app.conf.task_queues = app.conf.task_queues
_vhost_apps[vhost] = vhost_app
return _vhost_apps[vhost]
3 changes: 3 additions & 0 deletions src/routing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from django.urls import re_path
from apps.competitions.consumers import SubmissionIOConsumer, SubmissionOutputConsumer
from utils.consumers import ComputeWorkersConsumer


websocket_urlpatterns = [
re_path(r'submission_input/(?P<user_pk>\d+)/(?P<submission_id>\d+)/(?P<secret>[^/]+)/$', SubmissionIOConsumer.as_asgi()),
re_path(r'submission_output/$', SubmissionOutputConsumer.as_asgi()),
re_path(r"ws/workers/$", ComputeWorkersConsumer.as_asgi()),
]
4 changes: 4 additions & 0 deletions src/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@
'task': 'profiles.tasks.clean_non_activated_users',
'schedule': timedelta(days=1), # Run every 24 hours
},
"refresh_compute_worker_health": {
"task": "competitions.tasks.refresh_compute_worker_health",
"schedule": 60,
},
}
CELERY_TIMEZONE = 'UTC'
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
Expand Down
7 changes: 7 additions & 0 deletions src/static/riot/competitions/detail/_header.tag
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
<button class="ui small button" onclick="{show_modal.bind(this, '.migration.modal')}">
Migrate
</button>

<worker-monitor-toggle
if="{competition.admin}"
can_view_workers_panel="true"
competition_id="{ competition.id }">
</worker-monitor-toggle>

</div>
<div class="row">
<div class="column">
Expand Down
2 changes: 1 addition & 1 deletion src/static/riot/competitions/detail/detail.tag
Original file line number Diff line number Diff line change
Expand Up @@ -645,4 +645,4 @@
}
}
</style>
</competition-detail>
</competition-detail>
Loading