From 664e18941d816120bd6b6ea3c27e10cce47cc81b Mon Sep 17 00:00:00 2001 From: Estelle Da Date: Mon, 27 Apr 2026 17:29:50 +1000 Subject: [PATCH 1/3] Debug get_score_set_variants_as_csv function. Modify related tests and ass ome related tests. --- src/mavedb/lib/score_sets.py | 162 ++++++++++++-------------------- tests/routers/test_score_set.py | 77 +++++++++++++-- 2 files changed, 129 insertions(+), 110 deletions(-) diff --git a/src/mavedb/lib/score_sets.py b/src/mavedb/lib/score_sets.py index d6fa605f..e4397d4f 100644 --- a/src/mavedb/lib/score_sets.py +++ b/src/mavedb/lib/score_sets.py @@ -600,113 +600,67 @@ def get_score_set_variants_as_csv( namespaced_score_set_columns["gnomad"].append("gnomad_af") if "clingen" in namespaced_score_set_columns: namespaced_score_set_columns["clingen"].append("clingen_allele_id") - variants: Sequence[Variant] = [] - mappings: Optional[list[Optional[MappedVariant]]] = None - gnomad_data: Optional[list[Optional[GnomADVariant]]] = None - - if "gnomad" in namespaces and include_post_mapped_hgvs: - variants_mappings_and_gnomad_query = ( - select(Variant, MappedVariant, GnomADVariant) - .join( - MappedVariant, - and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), - isouter=True, - ) - .join(MappedVariant.gnomad_variants.of_type(GnomADVariant), isouter=True) - .where( - and_( - Variant.score_set_id == score_set.id, - or_( - and_( - GnomADVariant.db_name == "gnomAD", - GnomADVariant.db_version == "v4.1", - ), - GnomADVariant.id.is_(None), - ), - ) - ) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) - ) - if start: - variants_mappings_and_gnomad_query = variants_mappings_and_gnomad_query.offset(start) - if limit: - variants_mappings_and_gnomad_query = variants_mappings_and_gnomad_query.limit(limit) - variants_mappings_and_gnomad = db.execute(variants_mappings_and_gnomad_query).all() - - variants = [] - mappings = [] - gnomad_data = [] - for variant, mapping, gnomad in variants_mappings_and_gnomad: - variants.append(variant) - mappings.append(mapping) - gnomad_data.append(gnomad) - elif include_post_mapped_hgvs: - variants_and_mappings_query = ( - select(Variant, MappedVariant) - .join( - MappedVariant, - and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), - isouter=True, - ) - .where(Variant.score_set_id == score_set.id) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) + + needs_mapping = ( + include_post_mapped_hgvs + or "clingen" in namespaces + or "vep" in namespaces + or "gnomad" in namespaces + ) + needs_gnomad = "gnomad" in namespaces + variants: list[Variant] = [] + mappings: Optional[list[Optional[MappedVariant]]] = [] if needs_mapping else None + gnomad_data: Optional[list[Optional[GnomADVariant]]] = [] if needs_gnomad else None + + select_columns: list[Any] = [Variant] + if needs_mapping: + select_columns.append(MappedVariant) + if needs_gnomad: + select_columns.append(GnomADVariant) + + # Start from Variant + query = ( + select(*select_columns) + .where(Variant.score_set_id == score_set.id) + .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) + ) + + if needs_mapping: + query = query.join( + MappedVariant, + and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), + isouter=True, ) - if start: - variants_and_mappings_query = variants_and_mappings_query.offset(start) - if limit: - variants_and_mappings_query = variants_and_mappings_query.limit(limit) - variants_and_mappings = db.execute(variants_and_mappings_query).all() - - variants = [] - mappings = [] - for variant, mapping in variants_and_mappings: - variants.append(variant) - mappings.append(mapping) - elif "gnomad" in namespaces: - variants_and_gnomad_query = ( - select(Variant, GnomADVariant) - .join( - MappedVariant, - and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), - isouter=True, - ) - .join(MappedVariant.gnomad_variants.of_type(GnomADVariant), isouter=True) - .where( - and_( - Variant.score_set_id == score_set.id, - or_( - and_( - GnomADVariant.db_name == "gnomAD", - GnomADVariant.db_version == "v4.1", - ), - GnomADVariant.id.is_(None), - ), - ) + + if needs_gnomad: + query = query.join( + MappedVariant.gnomad_variants.of_type(GnomADVariant), + isouter=True, + ).where( + or_( + and_(GnomADVariant.db_name == "gnomAD", GnomADVariant.db_version == "v4.1"), + GnomADVariant.id.is_(None), ) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) - ) - if start: - variants_and_gnomad_query = variants_and_gnomad_query.offset(start) - if limit: - variants_and_gnomad_query = variants_and_gnomad_query.limit(limit) - variants_and_gnomad = db.execute(variants_and_gnomad_query).all() - - variants = [] - gnomad_data = [] - for variant, gnomad in variants_and_gnomad: - variants.append(variant) - gnomad_data.append(gnomad) - else: - variants_query = ( - select(Variant) - .where(Variant.score_set_id == score_set.id) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) ) - if start: - variants_query = variants_query.offset(start) - if limit: - variants_query = variants_query.limit(limit) - variants = db.scalars(variants_query).all() + + if start: + query = query.offset(start) + if limit: + query = query.limit(limit) + + result = db.execute(query).all() + + for row in result: + variant = row[0] + variants.append(variant) + + if needs_mapping and mappings is not None: + mappings.append(row[1]) + + if needs_gnomad and gnomad_data is not None: + idx = 2 if needs_mapping else 1 + gnomad_data.append(row[idx]) + rows_data = variants_to_csv_rows( variants, columns=namespaced_score_set_columns, diff --git a/tests/routers/test_score_set.py b/tests/routers/test_score_set.py index c1476a65..f898d15a 100644 --- a/tests/routers/test_score_set.py +++ b/tests/routers/test_score_set.py @@ -3342,7 +3342,7 @@ def test_download_vep_file_in_variant_data_path(session, data_provider, client, worker_queue.assert_called_once() response = client.get( - f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&include_post_mapped_hgvs=true&drop_na_columns=true" + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&drop_na_columns=true" ) assert response.status_code == 200 reader = csv.DictReader(StringIO(response.text)) @@ -3371,7 +3371,7 @@ def test_download_clingen_file_in_variant_data_path(session, data_provider, clie worker_queue.assert_called_once() response = client.get( - f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&include_post_mapped_hgvs=true&drop_na_columns=true" + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&drop_na_columns=true" ) assert response.status_code == 200 reader = csv.DictReader(StringIO(response.text)) @@ -3382,10 +3382,6 @@ def test_download_clingen_file_in_variant_data_path(session, data_provider, clie def test_download_gnomad_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): experiment = create_experiment(client) - score_set = create_seq_score_set(client, experiment["urn"]) - score_set = mock_worker_variant_insertion( - client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" - ) # Link a gnomAD variant to the first mapped variant (version may not match export filter) score_set = create_seq_score_set_with_mapped_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" @@ -3404,6 +3400,75 @@ def test_download_gnomad_file_in_variant_data_path(session, data_provider, clien assert "gnomad.gnomad_af" in reader.fieldnames +def test_download_clingen_and_vep_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + score_set = mock_worker_variant_insertion( + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" + ) + # Create mapped variants with VEP consequence populated + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) + db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one() + first_mapped_variant = db_score_set.variants[0].mapped_variants[0] + first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID + session.add(first_mapped_variant) + session.commit() + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&namespaces=vep&drop_na_columns=true" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert "vep.vep_functional_consequence" in reader.fieldnames + assert "clingen.clingen_allele_id" in reader.fieldnames + rows = list(reader) + assert any(row.get("vep.vep_functional_consequence") == "missense_variant" for row in rows) + assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID + + +def test_download_clingen_and_scores_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + score_set = mock_worker_variant_insertion( + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" + ) + # Create mapped variants with VEP consequence populated + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) + db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one() + first_mapped_variant = db_score_set.variants[0].mapped_variants[0] + first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID + session.add(first_mapped_variant) + session.commit() + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=scores&namespaces=clingen&drop_na_columns=true" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert "clingen.clingen_allele_id" in reader.fieldnames + rows = list(reader) + assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID + download_multiple_data_csv = response.text + reader = csv.DictReader(StringIO(download_multiple_data_csv)) + assert sorted(reader.fieldnames) == sorted( + [ + "accession", + "clingen.clingen_allele_id", + "hgvs_nt", + "hgvs_pro", + "scores.score", + ] + ) + + ######################################################################################################################## # Fetching clinical controls and control options for a score set ######################################################################################################################## From 1e26b2efd9601e279b7236d5fadbc837ca3d231d Mon Sep 17 00:00:00 2001 From: Estelle Da Date: Tue, 28 Apr 2026 12:48:03 +1000 Subject: [PATCH 2/3] Solve conflicts between release-2026.1.3 and local branch. --- src/mavedb/lib/score_sets.py | 195 +++++++++++++++++------- src/mavedb/lib/validation/transform.py | 6 +- src/mavedb/routers/score_sets.py | 37 ++++- tests/helpers/util/score_set.py | 17 +++ tests/routers/test_score_set.py | 200 +++++++++++++++++++++++-- 5 files changed, 377 insertions(+), 78 deletions(-) diff --git a/src/mavedb/lib/score_sets.py b/src/mavedb/lib/score_sets.py index 70a5e5c5..dfe96d00 100644 --- a/src/mavedb/lib/score_sets.py +++ b/src/mavedb/lib/score_sets.py @@ -4,7 +4,7 @@ import re from collections import Counter from operator import attrgetter -from typing import TYPE_CHECKING, Any, BinaryIO, Iterable, List, Literal, Optional, Sequence +from typing import TYPE_CHECKING, Any, BinaryIO, Iterable, List, Optional, Sequence import numpy as np import pandas as pd @@ -37,6 +37,8 @@ from mavedb.models.experiment_controlled_keyword import ExperimentControlledKeywordAssociation from mavedb.models.experiment_publication_identifier import ExperimentPublicationIdentifierAssociation from mavedb.models.experiment_set import ExperimentSet +from mavedb.models.clinical_control import ClinicalControl +from mavedb.models.clinical_control_mapped_variant import mapped_variants_clinical_controls_association_table from mavedb.models.gnomad_variant import GnomADVariant from mavedb.models.mapped_variant import MappedVariant from mavedb.models.publication_identifier import PublicationIdentifier @@ -63,6 +65,10 @@ logger = logging.getLogger(__name__) +# Pattern for ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH", +# e.g. "clinvar.2024_01" for January 2024. +CLINVAR_NS_PATTERN = re.compile(r"^clinvar\.(\d+)_(0[1-9]|1[0-2])$") + class HGVSColumns: NUCLEOTIDE: str = "hgvs_nt" # dataset.constants.hgvs_nt_column @@ -77,18 +83,13 @@ def options(cls) -> list[str]: def build_search_score_sets_query_filter( db: Session, query: Query[ScoreSet], owner_or_contributor: Optional[User], search: ScoreSetsSearch ): - superseding_score_set = aliased(ScoreSet) - - # Exclude superseded score sets from search results, but only when the superseding - # version is published. An unpublished replacement should not hide its published - # precursor from public search results. - query = query.join(superseding_score_set, ScoreSet.superseding_score_set, isouter=True) - query = query.filter( - or_( - superseding_score_set.id.is_(None), - superseding_score_set.published_date.is_(None), - ) - ) + # Exclude score sets that have been publicly superseded (i.e., have at least one + # published superseding version). Uses NOT EXISTS instead of LEFT OUTER JOIN to + # avoid row multiplication when multiple superseding versions point to the same + # original via replaces_id (which has no uniqueness constraint). A LEFT JOIN would + # produce N rows per original, all counted against the LIMIT, causing paginated + # searches to return fewer unique score sets than requested. + query = query.filter(~ScoreSet.superseding_score_set.has(ScoreSet.published_date.isnot(None))) if owner_or_contributor is not None: query = query.filter( @@ -532,7 +533,7 @@ def find_publish_or_private_superseded_score_set_tail( def get_score_set_variants_as_csv( db: Session, score_set: ScoreSet, - namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]], + namespaces: List[str], namespaced: Optional[bool] = None, start: Optional[int] = None, limit: Optional[int] = None, @@ -549,8 +550,10 @@ def get_score_set_variants_as_csv( The database session to use. score_set : ScoreSet The score set to get the variants from. - namespaces : List[Literal["scores", "counts", "vep", "gnomad", "clingen"]] - The namespaces for data. Now there are only scores, counts, VEP, gnomAD, and ClinGen. ClinVar will be added in the future. + namespaces : List[str] + The namespaces for data: "scores", "counts", "vep", "gnomad", "clingen", and/or + ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH" (e.g. "clinvar.2024_01" + for January 2024, which joins on db_name="ClinVar" and db_version="01_2024"). namespaced: Optional[bool] = None Whether namespace the columns or not. start : int, optional @@ -602,38 +605,50 @@ def get_score_set_variants_as_csv( if "clingen" in namespaced_score_set_columns: namespaced_score_set_columns["clingen"].append("clingen_allele_id") - needs_mapping = ( + # Parse ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH". + # The corresponding db_version stored in clinical_controls is "MONTH_YEAR". + clinvar_namespaces: dict[str, str] = {} # namespace -> db_version (MONTH_YEAR) + for ns in namespaces: + m = CLINVAR_NS_PATTERN.match(ns) + if m: + year, month = m.group(1), m.group(2) + db_version = f"{month}_{year}" + clinvar_namespaces[ns] = db_version + namespaced_score_set_columns[ns] = ["clinical_significance", "clinical_review_status"] + + need_mappings = ( include_post_mapped_hgvs or "clingen" in namespaces or "vep" in namespaces or "gnomad" in namespaces + or bool(clinvar_namespaces) ) - needs_gnomad = "gnomad" in namespaces + need_gnomad = "gnomad" in namespaces + variants: list[Variant] = [] - mappings: Optional[list[Optional[MappedVariant]]] = [] if needs_mapping else None - gnomad_data: Optional[list[Optional[GnomADVariant]]] = [] if needs_gnomad else None + mappings: Optional[list[Optional[MappedVariant]]] = [] if need_mappings else None + gnomad_data: Optional[list[Optional[GnomADVariant]]] = [] if need_gnomad else None select_columns: list[Any] = [Variant] - if needs_mapping: + if need_mappings: select_columns.append(MappedVariant) - if needs_gnomad: + if need_gnomad: select_columns.append(GnomADVariant) - # Start from Variant query = ( select(*select_columns) .where(Variant.score_set_id == score_set.id) .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) ) - if needs_mapping: + if need_mappings: query = query.join( MappedVariant, and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), isouter=True, ) - if needs_gnomad: + if need_gnomad: query = query.join( MappedVariant.gnomad_variants.of_type(GnomADVariant), isouter=True, @@ -655,29 +670,74 @@ def get_score_set_variants_as_csv( variant = row[0] variants.append(variant) - if needs_mapping and mappings is not None: + if need_mappings and mappings is not None: mappings.append(row[1]) - if needs_gnomad and gnomad_data is not None: - idx = 2 if needs_mapping else 1 + if need_gnomad and gnomad_data is not None: + idx = 2 if need_mappings else 1 gnomad_data.append(row[idx]) + # For each ClinVar namespace, fetch a mapping from mapped_variant_id to ClinicalControl. + clinvar_data_map: dict[str, dict[int, Optional[ClinicalControl]]] = {} + if clinvar_namespaces and mappings is not None: + mv_ids = [m.id for m in mappings if m is not None] + for ns, db_version in clinvar_namespaces.items(): + mv_to_cc: dict[int, Optional[ClinicalControl]] = {} + if mv_ids: + aliased_cc = aliased(ClinicalControl) + cc_query = ( + select( + mapped_variants_clinical_controls_association_table.c.mapped_variant_id, + aliased_cc, + ) + .join( + aliased_cc, + mapped_variants_clinical_controls_association_table.c.clinical_control_id == aliased_cc.id, + ) + .where( + and_( + mapped_variants_clinical_controls_association_table.c.mapped_variant_id.in_(mv_ids), + aliased_cc.db_name == "ClinVar", + aliased_cc.db_version == db_version, + ) + ) + ) + for mv_id, cc in db.execute(cc_query).all(): + mv_to_cc[mv_id] = cc + clinvar_data_map[ns] = mv_to_cc + + # Build per-variant ClinVar lookup (list indexed in parallel with variants). + clinvar_per_variant: Optional[list[Optional[dict[str, Optional[ClinicalControl]]]]] = None + if clinvar_namespaces and mappings is not None: + clinvar_per_variant = [] + for mapping in mappings: + row_clinvar: dict[str, Optional[ClinicalControl]] = {} + for ns, mv_to_cc in clinvar_data_map.items(): + row_clinvar[ns] = mv_to_cc.get(mapping.id) if mapping is not None else None + clinvar_per_variant.append(row_clinvar) + rows_data = variants_to_csv_rows( variants, columns=namespaced_score_set_columns, namespaced=namespaced, mappings=mappings, gnomad_data=gnomad_data, + clinvar_data_by_ns=clinvar_per_variant, ) # type: ignore - rows_columns = [ - ( - f"{namespace}.{col}" - if (namespaced and namespace not in ["core", "mavedb"]) - else (f"mavedb.{col}" if namespaced and namespace == "mavedb" else col) - ) - for namespace, cols in namespaced_score_set_columns.items() - for col in cols - ] + + rows_columns = [] + for namespace, cols in namespaced_score_set_columns.items(): + for col in cols: + if CLINVAR_NS_PATTERN.match(namespace): + # ClinVar versioned namespaces always include the full namespace prefix + # to avoid column-name collisions when multiple versions are requested. + rows_columns.append(f"{namespace}.{col}") + elif namespaced and namespace not in ["core", "mavedb"]: + rows_columns.append(f"{namespace}.{col}") + elif namespaced and namespace == "mavedb": + rows_columns.append(f"mavedb.{col}") + else: + rows_columns.append(col) if drop_na_columns: rows_data, rows_columns = drop_na_columns_from_csv_file_rows(rows_data, rows_columns) @@ -724,6 +784,7 @@ def variant_to_csv_row( columns: dict[str, list[str]], mapping: Optional[MappedVariant] = None, gnomad_data: Optional[GnomADVariant] = None, + clinvar_data_by_ns: Optional[dict[str, Optional[ClinicalControl]]] = None, namespaced: Optional[bool] = None, na_rep="NA", ) -> dict[str, Any]: @@ -742,6 +803,8 @@ def variant_to_csv_row( Mapped variant corresponding to the variant. gnomad_data : variant.models.GnomADVariant, optional gnomAD variant data corresponding to the variant. + clinvar_data_by_ns : dict[str, Optional[ClinicalControl]], optional + Per-variant ClinVar data keyed by namespace (e.g. "clinvar.2024_01"). na_rep : str String to represent null values. @@ -840,6 +903,23 @@ def variant_to_csv_row( value = na_rep key = f"clingen.{column_key}" if namespaced else column_key row[key] = value + # Handle ClinVar-versioned namespaces (e.g. "clinvar.2024_01"). + # These always use the full "namespace.column" key regardless of the namespaced flag + # to avoid collisions when multiple versions are requested. + for namespace_key, namespace_cols in columns.items(): + if not CLINVAR_NS_PATTERN.match(namespace_key): + continue + clinvar_entry = (clinvar_data_by_ns or {}).get(namespace_key) + for column_key in namespace_cols: + if column_key == "clinical_significance": + value = str(clinvar_entry.clinical_significance) if clinvar_entry else na_rep + elif column_key == "clinical_review_status": + value = str(clinvar_entry.clinical_review_status) if clinvar_entry else na_rep + else: + value = na_rep + if is_null(value): + value = na_rep + row[f"{namespace_key}.{column_key}"] = value return row @@ -848,6 +928,7 @@ def variants_to_csv_rows( columns: dict[str, list[str]], mappings: Optional[Sequence[Optional[MappedVariant]]] = None, gnomad_data: Optional[Sequence[Optional[GnomADVariant]]] = None, + clinvar_data_by_ns: Optional[Sequence[Optional[dict[str, Optional[ClinicalControl]]]]] = None, namespaced: Optional[bool] = None, na_rep="NA", ) -> Iterable[dict[str, Any]]: @@ -866,6 +947,8 @@ def variants_to_csv_rows( List of mapped variants corresponding to the variants. gnomad_data : list[Optional[variant.models.GnomADVariant]], optional List of gnomAD variant data corresponding to the variants. + clinvar_data_by_ns : list[Optional[dict[str, Optional[ClinicalControl]]]], optional + Per-variant ClinVar data keyed by namespace (e.g. "clinvar.2024_01"). na_rep : str String to represent null values. @@ -873,26 +956,24 @@ def variants_to_csv_rows( ------- list[dict[str, Any]] """ - if mappings is not None and gnomad_data is not None: - return map( - lambda zipped: variant_to_csv_row( - zipped[0], columns, mapping=zipped[1], gnomad_data=zipped[2], namespaced=namespaced, na_rep=na_rep - ), - zip(variants, mappings, gnomad_data), - ) - elif mappings is not None: - return map( - lambda pair: variant_to_csv_row(pair[0], columns, mapping=pair[1], namespaced=namespaced, na_rep=na_rep), - zip(variants, mappings), - ) - elif gnomad_data is not None: - return map( - lambda pair: variant_to_csv_row( - pair[0], columns, gnomad_data=pair[1], namespaced=namespaced, na_rep=na_rep - ), - zip(variants, gnomad_data), - ) - return map(lambda v: variant_to_csv_row(v, columns, namespaced=namespaced, na_rep=na_rep), variants) + n = len(variants) + _mappings: Sequence[Optional[MappedVariant]] = mappings if mappings is not None else [None] * n + _gnomad: Sequence[Optional[GnomADVariant]] = gnomad_data if gnomad_data is not None else [None] * n + _clinvar: Sequence[Optional[dict[str, Optional[ClinicalControl]]]] = ( + clinvar_data_by_ns if clinvar_data_by_ns is not None else [None] * n + ) + return map( + lambda t: variant_to_csv_row( + t[0], + columns, + mapping=t[1], + gnomad_data=t[2], + clinvar_data_by_ns=t[3], + namespaced=namespaced, + na_rep=na_rep, + ), + zip(variants, _mappings, _gnomad, _clinvar), + ) def find_meta_analyses_for_score_sets(db: Session, urns: list[str]) -> list[ScoreSet]: diff --git a/src/mavedb/lib/validation/transform.py b/src/mavedb/lib/validation/transform.py index 76529588..32c2dff1 100644 --- a/src/mavedb/lib/validation/transform.py +++ b/src/mavedb/lib/validation/transform.py @@ -35,9 +35,9 @@ def transform_score_set_list_to_urn_list( return [score_set.urn for score_set in score_sets] else: return [ - score_set.urn - for score_set in score_sets - if score_set.superseding_score_set is None or score_set.superseding_score_set.published_date is None + score_set.urn + for score_set in score_sets + if score_set.superseding_score_set is None or score_set.superseding_score_set.published_date is None ] diff --git a/src/mavedb/routers/score_sets.py b/src/mavedb/routers/score_sets.py index 47532cd3..d737e269 100644 --- a/src/mavedb/routers/score_sets.py +++ b/src/mavedb/routers/score_sets.py @@ -2,7 +2,7 @@ import logging import time from datetime import date, datetime -from typing import Any, List, Literal, Optional, Sequence, TypedDict, Union +from typing import Any, List, Optional, Sequence, TypedDict, Union import numpy as np import pandas as pd @@ -48,6 +48,7 @@ from mavedb.lib.permissions import Action, assert_permission, has_permission from mavedb.lib.score_calibrations import create_score_calibration from mavedb.lib.score_sets import ( + CLINVAR_NS_PATTERN, csv_data_to_df, fetch_score_set_search_filter_options, find_meta_analyses_for_experiment_sets, @@ -794,8 +795,13 @@ def get_score_set_variants_csv( urn: str, start: int = Query(default=None, description="Start index for pagination"), limit: int = Query(default=None, description="Maximum number of variants to return"), - namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]] = Query( - default=["scores"], description="One or more data types to include: scores, counts, ClinGen, gnomAD, VEP" + namespaces: List[str] = Query( + default=["scores"], + description=( + 'One or more data types to include: "scores", "counts", "vep", "gnomad", "clingen", ' + 'and/or ClinVar-versioned namespaces of the form "clinvar.YEAR_MONTH" ' + '(e.g. "clinvar.2024_01" for January 2024).' + ), ), drop_na_columns: Optional[bool] = None, include_custom_columns: Optional[bool] = None, @@ -809,9 +815,6 @@ def get_score_set_variants_csv( This differs from get_score_set_scores_csv() in that it returns only the HGVS columns, score column, and mapped HGVS string. - TODO (https://github.com/VariantEffect/mavedb-api/issues/446) We may add another function for ClinVar and gnomAD. - export endpoint, with options governing which columns to include. - Parameters __________ urn : str @@ -820,9 +823,11 @@ def get_score_set_variants_csv( The index to start from. If None, starts from the beginning. limit : Optional[int] The maximum number of variants to return. If None, returns all variants. - namespaces: List[Literal["scores", "counts", "vep", "gnomad", "clingen"]] + namespaces: List[str] The namespaces of all columns except for accession, hgvs_nt, hgvs_pro, and hgvs_splice. - We may add ClinVar in the future. + Supported values: "scores", "counts", "vep", "gnomad", "clingen", and ClinVar-versioned + namespaces of the form "clinvar.YEAR_MONTH" (e.g. "clinvar.2024_01" for January 2024). + Multiple ClinVar namespaces with different YEAR_MONTH values may be requested simultaneously. drop_na_columns : bool, optional Whether to drop columns that contain only NA values. Defaults to False. db : Session @@ -852,6 +857,21 @@ def get_score_set_variants_csv( logger.info(msg="Could not fetch scores with non-positive limit.", extra=logging_context()) raise HTTPException(status_code=422, detail="Limit must be positive") + _VALID_STATIC_NAMESPACES = {"scores", "counts", "vep", "gnomad", "clingen"} + invalid_namespaces = [ + ns for ns in namespaces if ns not in _VALID_STATIC_NAMESPACES and not CLINVAR_NS_PATTERN.match(ns) + ] + if invalid_namespaces: + raise HTTPException( + status_code=422, + detail=( + f"Invalid namespace(s): {invalid_namespaces}. " + 'Each namespace must be one of "scores", "counts", "vep", "gnomad", "clingen", ' + 'or a ClinVar-versioned namespace of the form "clinvar.YEAR_MM" ' + '(e.g. "clinvar.2024_01" for January 2024).' + ), + ) + score_set = db.query(ScoreSet).filter(ScoreSet.urn == urn).first() if not score_set: logger.info(msg="Could not fetch the requested scores; No such score set exists.", extra=logging_context()) @@ -2351,6 +2371,7 @@ async def get_clinical_controls_options_for_score_set( select(ClinicalControl.db_name, ClinicalControl.db_version) .join(MappedVariant, ClinicalControl.mapped_variants) .join(Variant) + .where(MappedVariant.current.is_(True)) .where(Variant.score_set_id == item.id) ) diff --git a/tests/helpers/util/score_set.py b/tests/helpers/util/score_set.py index cd9f1bed..b6d7801a 100644 --- a/tests/helpers/util/score_set.py +++ b/tests/helpers/util/score_set.py @@ -225,6 +225,23 @@ def link_clinical_controls_to_mapped_variants(db, score_set): db.commit() +def link_clinvar_control_to_mapped_variant(db, score_set): + """Link the seeded ClinVar clinical control (id=1) to the first mapped variant of a score set.""" + mapped_variants = db.scalars( + select(MappedVariantDbModel) + .join(VariantDbModel) + .join(ScoreSetDbModel) + .where(ScoreSetDbModel.urn == score_set["urn"]) + ).all() + + mapped_variants[0].clinical_controls.append( + db.scalar(select(ClinicalControlDbModel).where(ClinicalControlDbModel.id == 1)) + ) + + db.add(mapped_variants[0]) + db.commit() + + def link_gnomad_variants_to_mapped_variants(db, score_set): mapped_variants = db.scalars( select(MappedVariantDbModel) diff --git a/tests/routers/test_score_set.py b/tests/routers/test_score_set.py index f898d15a..c4b7676a 100644 --- a/tests/routers/test_score_set.py +++ b/tests/routers/test_score_set.py @@ -22,6 +22,7 @@ from mavedb.models.enums.processing_state import ProcessingState from mavedb.models.enums.target_category import TargetCategory from mavedb.models.experiment import Experiment as ExperimentDbModel +from mavedb.models.mapped_variant import MappedVariant as MappedVariantDbModel from mavedb.models.score_set import ScoreSet as ScoreSetDbModel from mavedb.models.variant import Variant as VariantDbModel from mavedb.view_models.orcid import OrcidUser @@ -37,10 +38,11 @@ TEST_BIORXIV_IDENTIFIER, TEST_BRNICH_SCORE_CALIBRATION_CLASS_BASED, TEST_BRNICH_SCORE_CALIBRATION_RANGE_BASED, + TEST_CLINVAR_CONTROL, TEST_CROSSREF_IDENTIFIER, - TEST_EXPERIMENT_WITH_KEYWORD, TEST_GNOMAD_DATA_VERSION, TEST_INACTIVE_LICENSE, + TEST_KEYWORDS, TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION, TEST_MAPPED_VARIANT_WITH_HGVS_P_EXPRESSION, TEST_MINIMAL_ACC_SCORESET, @@ -76,6 +78,7 @@ create_seq_score_set_with_mapped_variants, create_seq_score_set_with_variants, link_clinical_controls_to_mapped_variants, + link_clinvar_control_to_mapped_variant, link_gnomad_variants_to_mapped_variants, publish_score_set, ) @@ -875,7 +878,9 @@ def test_show_score_sets_anonymous_can_fetch_public_score_sets( assert response_data[0]["urn"] == published_score_set["urn"] -def test_show_score_sets_anonymous_cannot_fetch_private_score_sets(session, client, setup_router_db, anonymous_app_overrides): +def test_show_score_sets_anonymous_cannot_fetch_private_score_sets( + session, client, setup_router_db, anonymous_app_overrides +): experiment = create_experiment(client) score_set = create_seq_score_set(client, experiment["urn"]) # Score set is private (not published); change ownership so it belongs to another user @@ -927,7 +932,9 @@ def test_show_score_sets_mixed_public_and_private_returns_404( ): experiment = create_experiment(client) public_score_set = create_seq_score_set(client, experiment["urn"]) - public_score_set = mock_worker_variant_insertion(client, session, data_provider, public_score_set, data_files / "scores.csv") + public_score_set = mock_worker_variant_insertion( + client, session, data_provider, public_score_set, data_files / "scores.csv" + ) private_score_set = create_seq_score_set(client, experiment["urn"]) with patch.object(arq.ArqRedis, "enqueue_job", return_value=None): published_score_set = publish_score_set(client, public_score_set["urn"]) @@ -2677,15 +2684,10 @@ def test_search_score_sets_reports_correct_total_count_with_limit( def test_search_score_sets_not_affected_by_experiment_metadata( session, data_provider, client, setup_router_db, data_files ): - """Experiments with multiple keywords should not reduce the number of score sets returned by search. - - This is a regression test for a bug where joinedload on one-to-many experiment relationships caused row - multiplication in the main SQL query. The LIMIT clause was applied to the multiplied rows rather than unique - score sets, resulting in fewer results than expected. - """ + """Experiments with multiple keywords should not reduce the number of score sets returned by search.""" num_score_sets = 3 for i in range(num_score_sets): - experiment = create_experiment(client, {**TEST_EXPERIMENT_WITH_KEYWORD, "title": f"Experiment {i}"}) + experiment = create_experiment(client, {"keywords": TEST_KEYWORDS, "title": f"Experiment {i}"}) score_set = create_seq_score_set(client, experiment["urn"], update={"title": f"Score Set {i}"}) score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") @@ -2699,6 +2701,44 @@ def test_search_score_sets_not_affected_by_experiment_metadata( assert response.json()["numScoreSets"] == num_score_sets +def test_search_score_sets_not_affected_by_multiple_superseding_versions( + session, data_provider, client, setup_router_db, data_files +): + """Multiple unpublished superseding versions of the same score set should not reduce search page size. + + Regression test for a bug where the superseding score set filter used a LEFT OUTER JOIN + (scoresets LEFT JOIN scoresets AS s ON scoresets.id = s.replaces_id). Since replaces_id has + no uniqueness constraint, a score set with N superseding versions produces N rows, all inside + the LIMIT boundary. This consumed extra row budget and caused paginated searches to return + fewer unique score sets than the requested limit. + """ + num_published = 3 + published_urns = [] + for i in range(num_published): + experiment = create_experiment(client, {"title": f"Experiment {i}"}) + score_set = create_seq_score_set(client, experiment["urn"], update={"title": f"Score Set {i}"}) + score_set = mock_worker_variant_insertion(client, session, data_provider, score_set, data_files / "scores.csv") + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None): + published = publish_score_set(client, score_set["urn"]) + published_urns.append(published["urn"]) + + # Create multiple unpublished superseding versions for the first score set. + # These share the same replaces_id, which caused row multiplication with the old LEFT JOIN filter. + for j in range(3): + create_seq_score_set( + client, + create_experiment(client, {"title": f"Superseding Experiment {j}"})["urn"], + update={"title": f"Superseding {j}", "supersededScoreSetUrn": published_urns[0]}, + ) + + search_payload = {"limit": 2} + response = client.post("/api/v1/score-sets/search", json=search_payload) + assert response.status_code == 200 + assert len(response.json()["scoreSets"]) == 2 + assert response.json()["numScoreSets"] == num_published + + ######################################################################################################################## # Score set deletion ######################################################################################################################## @@ -3469,6 +3509,124 @@ def test_download_clingen_and_scores_file_in_variant_data_path(session, data_pro ) +def test_download_clinvar_namespace_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + """ClinVar namespace returns clinical_significance and clinical_review_status columns with correct values.""" + # The ClinVar control seeded in setup_router_db has db_version="11_2024", mapping to namespace clinvar.2024_11. + clinvar_namespace = "clinvar.2024_11" + experiment = create_experiment(client) + score_set = create_seq_score_set_with_mapped_variants( + client, session, data_provider, experiment["urn"], data_files / "scores.csv" + ) + link_clinvar_control_to_mapped_variant(session, score_set) + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data" + f"?namespaces={clinvar_namespace}&drop_na_columns=false" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert f"{clinvar_namespace}.clinical_significance" in reader.fieldnames + assert f"{clinvar_namespace}.clinical_review_status" in reader.fieldnames + + rows = list(reader) + # The first variant is linked to the ClinVar control; check its values. + assert rows[0][f"{clinvar_namespace}.clinical_significance"] == TEST_CLINVAR_CONTROL["clinical_significance"] + assert rows[0][f"{clinvar_namespace}.clinical_review_status"] == TEST_CLINVAR_CONTROL["clinical_review_status"] + # Other variants have no linked control for this version; they should be NA. + assert all(row[f"{clinvar_namespace}.clinical_significance"] == "NA" for row in rows[1:]) + assert all(row[f"{clinvar_namespace}.clinical_review_status"] == "NA" for row in rows[1:]) + + +def test_download_clinvar_namespace_with_no_matching_version( + session, data_provider, client, setup_router_db, data_files +): + """When no controls match the requested ClinVar version, all rows return NA.""" + # clinvar.2023_01 does not match the seeded control (11_2024), so all rows should be NA. + clinvar_namespace = "clinvar.2023_01" + experiment = create_experiment(client) + score_set = create_seq_score_set_with_mapped_variants( + client, session, data_provider, experiment["urn"], data_files / "scores.csv" + ) + link_clinvar_control_to_mapped_variant(session, score_set) + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data" + f"?namespaces={clinvar_namespace}&drop_na_columns=false" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert f"{clinvar_namespace}.clinical_significance" in reader.fieldnames + assert f"{clinvar_namespace}.clinical_review_status" in reader.fieldnames + + rows = list(reader) + assert all(row[f"{clinvar_namespace}.clinical_significance"] == "NA" for row in rows) + assert all(row[f"{clinvar_namespace}.clinical_review_status"] == "NA" for row in rows) + + +def test_download_multiple_clinvar_namespaces_in_variant_data_path( + session, data_provider, client, setup_router_db, data_files +): + """Multiple ClinVar namespaces produce distinct column sets; only the matching version has real data.""" + matching_ns = "clinvar.2024_11" # matches db_version="11_2024" seeded in setup_router_db + non_matching_ns = "clinvar.2023_01" # no controls with this version + experiment = create_experiment(client) + score_set = create_seq_score_set_with_mapped_variants( + client, session, data_provider, experiment["urn"], data_files / "scores.csv" + ) + link_clinvar_control_to_mapped_variant(session, score_set) + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data" + f"?namespaces={matching_ns}&namespaces={non_matching_ns}&drop_na_columns=false" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + fieldnames = reader.fieldnames + # Both namespaces produce columns. + assert f"{matching_ns}.clinical_significance" in fieldnames + assert f"{matching_ns}.clinical_review_status" in fieldnames + assert f"{non_matching_ns}.clinical_significance" in fieldnames + assert f"{non_matching_ns}.clinical_review_status" in fieldnames + + rows = list(reader) + # Matching version: first variant has data. + assert rows[0][f"{matching_ns}.clinical_significance"] == TEST_CLINVAR_CONTROL["clinical_significance"] + assert rows[0][f"{matching_ns}.clinical_review_status"] == TEST_CLINVAR_CONTROL["clinical_review_status"] + # Non-matching version: all rows are NA. + assert all(row[f"{non_matching_ns}.clinical_significance"] == "NA" for row in rows) + assert all(row[f"{non_matching_ns}.clinical_review_status"] == "NA" for row in rows) + + +def test_invalid_clinvar_namespace_returns_422(client, setup_router_db, data_files): + """A clinvar namespace with an out-of-range month (13) is rejected with 422.""" + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + + response = client.get(f"/api/v1/score-sets/{score_set['urn']}/variants/data?namespaces=clinvar.2024_13") + assert response.status_code == 422 + + +def test_unrecognized_namespace_returns_422(client, setup_router_db, data_files): + """An entirely unrecognized namespace string is rejected with 422.""" + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + + response = client.get(f"/api/v1/score-sets/{score_set['urn']}/variants/data?namespaces=unknown_namespace") + assert response.status_code == 422 + + ######################################################################################################################## # Fetching clinical controls and control options for a score set ######################################################################################################################## @@ -3587,6 +3745,28 @@ def test_can_fetch_current_clinical_control_options_for_score_set( ) +def test_clinical_control_options_exclude_non_current(client, setup_router_db, session, data_provider, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set_with_mapped_variants( + client, session, data_provider, experiment["urn"], data_files / "scores.csv" + ) + link_clinical_controls_to_mapped_variants(session, score_set) + + # Mark all mapped variants as non-current to simulate stale mapping data. + mapped_variants = session.scalars( + select(MappedVariantDbModel) + .join(VariantDbModel) + .join(ScoreSetDbModel) + .where(ScoreSetDbModel.urn == score_set["urn"]) + ).all() + for mv in mapped_variants: + mv.current = False + session.commit() + + response = client.get(f"/api/v1/score-sets/{score_set['urn']}/clinical-controls/options") + assert response.status_code == 404 + + ######################################################################################################################## # Fetching annotated variants for a score set ######################################################################################################################## From 6902a5c9fd0210c6e3b41c6a4b8df1280b53103d Mon Sep 17 00:00:00 2001 From: Estelle Da Date: Tue, 28 Apr 2026 12:58:20 +1000 Subject: [PATCH 3/3] Solve "dict" has incompatible type "int | None" problem. --- src/mavedb/lib/score_sets.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/mavedb/lib/score_sets.py b/src/mavedb/lib/score_sets.py index dfe96d00..048c6d6f 100644 --- a/src/mavedb/lib/score_sets.py +++ b/src/mavedb/lib/score_sets.py @@ -713,7 +713,10 @@ def get_score_set_variants_as_csv( for mapping in mappings: row_clinvar: dict[str, Optional[ClinicalControl]] = {} for ns, mv_to_cc in clinvar_data_map.items(): - row_clinvar[ns] = mv_to_cc.get(mapping.id) if mapping is not None else None + if mapping is not None and mapping.id is not None: + row_clinvar[ns] = mv_to_cc.get(mapping.id) + else: + row_clinvar[ns] = None clinvar_per_variant.append(row_clinvar) rows_data = variants_to_csv_rows(