diff --git a/src/apps/oidc_configurations/views.py b/src/apps/oidc_configurations/views.py index 6b04be8ff..9e739b078 100644 --- a/src/apps/oidc_configurations/views.py +++ b/src/apps/oidc_configurations/views.py @@ -53,53 +53,61 @@ def oidc_complete(request, auth_organization_id): if error: context["error"] = error - if error_description: context["error_description"] = error_description - # Token exhange process - if authorization_code: + if error or error_description: + return render(request, 'oidc/oidc_complete.html', context) - try: - # STEP 1: Get auth organization using its id - organization = get_object_or_404(Auth_Organization, pk=auth_organization_id) + if not authorization_code: + context["error"] = "Authorization Code not provided!" + return render(request, 'oidc/oidc_complete.html', context) - if organization: + try: + # STEP 1: Get auth organization using its id + organization = get_object_or_404(Auth_Organization, pk=auth_organization_id) + + # STEP 2: Get access token + access_token, token_error = get_access_token(organization, authorization_code) + if token_error: + context["error"] = token_error + return render(request, 'oidc/oidc_complete.html', context) + else: + # STEP 3: Get user info + user_info, user_info_error = get_user_info(organization, access_token) + if user_info_error: + context["error"] = user_info_error + return render(request, 'oidc/oidc_complete.html', context) + else: + if not isinstance(user_info, dict): + context["error"] = "Invalid user info payload from OIDC provider." + return render(request, 'oidc/oidc_complete.html', context) - # STEP 2: Get access token - access_token, token_error = get_access_token(organization, authorization_code) + user_email = user_info.get("email", None) + user_nickname = user_info.get("nickname", None) + if not user_email: + context["error"] = "Unable to extract email from user info." + return render(request, 'oidc/oidc_complete.html', context) - if token_error: - context["error"] = token_error + user_email = str(user_email).strip().lower() + + if user_nickname is not None: + user_nickname = str(user_nickname).strip() or None + + # get user with this email + user = get_user_by_email(user_email) + + # STEP 4: Check if user exists and user is created using oidc and oidc orgnaization matches this one + if user: + login(request, user, backend=BACKEND) + # Redirect the user home page + return redirect('pages:home') else: - # STEP 3: Get user info - user_info, user_info_error = get_user_info(organization, access_token) - if user_info_error: - context["error"] = user_info_error - else: - - # get email and nickname (username) of the user - user_email = user_info.get("email", None) - user_nickname = user_info.get("nickname", None) - if user_email: - # get user with this email - user = get_user_by_email(user_email) - # STEP 4: Check if user exists and user is created using oidc and oidc orgnaization matches this one - if user: - login(request, user, backend=BACKEND) - # Redirect the user home page - return redirect('pages:home') - else: - return register_and_authenticate_user(request, user_email, user_nickname, organization) - - else: - context["error"] = "Unable to extract email from user info! Please contact platform" - else: - context["error"] = "Invalid Organization ID!" - except Exception as e: - context["error"] = f"{e}" + return register_and_authenticate_user(request, user_email, user_nickname, organization) - return render(request, 'oidc/oidc_complete.html', context) + except Exception as e: + context["error"] = f"{e}" + return render(request, 'oidc/oidc_complete.html', context) def get_access_token(organization, authorization_code): @@ -181,14 +189,21 @@ def register_and_authenticate_user(request, user_email, user_nickname, organizat def create_unique_username(username): + # Normalize the username to remove unsupported characters and ensure it's route-safe. + username = normalize_username(username) + + # Truncate the username to fit within the maximum length allowed by the User model. + max_username_length = User._meta.get_field("username").max_length + username = username[:max_username_length] + # Check if the username already exists if User.objects.filter(username=username).exists(): # If the username already exists, modify it to make it unique suffix = 1 - new_username = f"{username}_{suffix}" + new_username = with_suffix(username, suffix, max_username_length) while User.objects.filter(username=new_username).exists(): suffix += 1 - new_username = f"{username}_{suffix}" + new_username = with_suffix(username, suffix, max_username_length) return new_username else: # If the username doesn't exist, use it as is @@ -201,3 +216,22 @@ def get_user_by_email(email): return user except User.DoesNotExist: return None + + +def normalize_username(username): + # Keep OIDC names readable while removing unsupported chars. + # Keep in sync with profile URL regex: [-a-zA-Z0-9_]+ + cleaned = re.sub(r'[^a-zA-Z0-9_-]', '', username.strip()) + if not cleaned: + raise ValueError("OIDC username contains no valid route-safe characters") + + return cleaned + + +def with_suffix(base_username, suffix, max_username_length): + suffix = f"_{suffix}" + limit = max_username_length - len(suffix) + if limit < 1: + raise ValueError("Unable to create unique username within max length") + + return f"{base_username[:limit]}{suffix}"