diff --git a/src/mavedb/lib/score_sets.py b/src/mavedb/lib/score_sets.py index fd96225e..048c6d6f 100644 --- a/src/mavedb/lib/score_sets.py +++ b/src/mavedb/lib/score_sets.py @@ -616,116 +616,66 @@ def get_score_set_variants_as_csv( clinvar_namespaces[ns] = db_version namespaced_score_set_columns[ns] = ["clinical_significance", "clinical_review_status"] - variants: Sequence[Variant] = [] - mappings: Optional[list[Optional[MappedVariant]]] = None - gnomad_data: Optional[list[Optional[GnomADVariant]]] = None - - # Mappings are needed whenever post-mapped HGVS or any ClinVar namespace is requested. - need_mappings = bool(include_post_mapped_hgvs or clinvar_namespaces) - - if "gnomad" in namespaces and need_mappings: - variants_mappings_and_gnomad_query = ( - select(Variant, MappedVariant, GnomADVariant) - .join( - MappedVariant, - and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), - isouter=True, - ) - .join(MappedVariant.gnomad_variants.of_type(GnomADVariant), isouter=True) - .where( - and_( - Variant.score_set_id == score_set.id, - or_( - and_( - GnomADVariant.db_name == "gnomAD", - GnomADVariant.db_version == "v4.1", - ), - GnomADVariant.id.is_(None), - ), - ) - ) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) - ) - if start: - variants_mappings_and_gnomad_query = variants_mappings_and_gnomad_query.offset(start) - if limit: - variants_mappings_and_gnomad_query = variants_mappings_and_gnomad_query.limit(limit) - variants_mappings_and_gnomad = db.execute(variants_mappings_and_gnomad_query).all() - - variants = [] - mappings = [] - gnomad_data = [] - for variant, mapping, gnomad in variants_mappings_and_gnomad: - variants.append(variant) - mappings.append(mapping) - gnomad_data.append(gnomad) - elif need_mappings: - variants_and_mappings_query = ( - select(Variant, MappedVariant) - .join( - MappedVariant, - and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), - isouter=True, - ) - .where(Variant.score_set_id == score_set.id) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) + need_mappings = ( + include_post_mapped_hgvs + or "clingen" in namespaces + or "vep" in namespaces + or "gnomad" in namespaces + or bool(clinvar_namespaces) + ) + need_gnomad = "gnomad" in namespaces + + variants: list[Variant] = [] + mappings: Optional[list[Optional[MappedVariant]]] = [] if need_mappings else None + gnomad_data: Optional[list[Optional[GnomADVariant]]] = [] if need_gnomad else None + + select_columns: list[Any] = [Variant] + if need_mappings: + select_columns.append(MappedVariant) + if need_gnomad: + select_columns.append(GnomADVariant) + + query = ( + select(*select_columns) + .where(Variant.score_set_id == score_set.id) + .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) + ) + + if need_mappings: + query = query.join( + MappedVariant, + and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), + isouter=True, ) - if start: - variants_and_mappings_query = variants_and_mappings_query.offset(start) - if limit: - variants_and_mappings_query = variants_and_mappings_query.limit(limit) - variants_and_mappings = db.execute(variants_and_mappings_query).all() - - variants = [] - mappings = [] - for variant, mapping in variants_and_mappings: - variants.append(variant) - mappings.append(mapping) - elif "gnomad" in namespaces: - variants_and_gnomad_query = ( - select(Variant, GnomADVariant) - .join( - MappedVariant, - and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)), - isouter=True, - ) - .join(MappedVariant.gnomad_variants.of_type(GnomADVariant), isouter=True) - .where( - and_( - Variant.score_set_id == score_set.id, - or_( - and_( - GnomADVariant.db_name == "gnomAD", - GnomADVariant.db_version == "v4.1", - ), - GnomADVariant.id.is_(None), - ), - ) + + if need_gnomad: + query = query.join( + MappedVariant.gnomad_variants.of_type(GnomADVariant), + isouter=True, + ).where( + or_( + and_(GnomADVariant.db_name == "gnomAD", GnomADVariant.db_version == "v4.1"), + GnomADVariant.id.is_(None), ) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) - ) - if start: - variants_and_gnomad_query = variants_and_gnomad_query.offset(start) - if limit: - variants_and_gnomad_query = variants_and_gnomad_query.limit(limit) - variants_and_gnomad = db.execute(variants_and_gnomad_query).all() - - variants = [] - gnomad_data = [] - for variant, gnomad in variants_and_gnomad: - variants.append(variant) - gnomad_data.append(gnomad) - else: - variants_query = ( - select(Variant) - .where(Variant.score_set_id == score_set.id) - .order_by(cast(func.split_part(Variant.urn, "#", 2), Integer)) ) - if start: - variants_query = variants_query.offset(start) - if limit: - variants_query = variants_query.limit(limit) - variants = db.scalars(variants_query).all() + + if start: + query = query.offset(start) + if limit: + query = query.limit(limit) + + result = db.execute(query).all() + + for row in result: + variant = row[0] + variants.append(variant) + + if need_mappings and mappings is not None: + mappings.append(row[1]) + + if need_gnomad and gnomad_data is not None: + idx = 2 if need_mappings else 1 + gnomad_data.append(row[idx]) # For each ClinVar namespace, fetch a mapping from mapped_variant_id to ClinicalControl. clinvar_data_map: dict[str, dict[int, Optional[ClinicalControl]]] = {} @@ -763,7 +713,10 @@ def get_score_set_variants_as_csv( for mapping in mappings: row_clinvar: dict[str, Optional[ClinicalControl]] = {} for ns, mv_to_cc in clinvar_data_map.items(): - row_clinvar[ns] = mv_to_cc.get(mapping.id) if mapping is not None else None + if mapping is not None and mapping.id is not None: + row_clinvar[ns] = mv_to_cc.get(mapping.id) + else: + row_clinvar[ns] = None clinvar_per_variant.append(row_clinvar) rows_data = variants_to_csv_rows( diff --git a/tests/routers/test_score_set.py b/tests/routers/test_score_set.py index 611acbed..c4b7676a 100644 --- a/tests/routers/test_score_set.py +++ b/tests/routers/test_score_set.py @@ -3382,7 +3382,7 @@ def test_download_vep_file_in_variant_data_path(session, data_provider, client, worker_queue.assert_called_once() response = client.get( - f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&include_post_mapped_hgvs=true&drop_na_columns=true" + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&drop_na_columns=true" ) assert response.status_code == 200 reader = csv.DictReader(StringIO(response.text)) @@ -3411,7 +3411,7 @@ def test_download_clingen_file_in_variant_data_path(session, data_provider, clie worker_queue.assert_called_once() response = client.get( - f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&include_post_mapped_hgvs=true&drop_na_columns=true" + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&drop_na_columns=true" ) assert response.status_code == 200 reader = csv.DictReader(StringIO(response.text)) @@ -3422,10 +3422,6 @@ def test_download_clingen_file_in_variant_data_path(session, data_provider, clie def test_download_gnomad_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): experiment = create_experiment(client) - score_set = create_seq_score_set(client, experiment["urn"]) - score_set = mock_worker_variant_insertion( - client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" - ) # Link a gnomAD variant to the first mapped variant (version may not match export filter) score_set = create_seq_score_set_with_mapped_variants( client, session, data_provider, experiment["urn"], data_files / "scores.csv" @@ -3444,6 +3440,75 @@ def test_download_gnomad_file_in_variant_data_path(session, data_provider, clien assert "gnomad.gnomad_af" in reader.fieldnames +def test_download_clingen_and_vep_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + score_set = mock_worker_variant_insertion( + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" + ) + # Create mapped variants with VEP consequence populated + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) + db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one() + first_mapped_variant = db_score_set.variants[0].mapped_variants[0] + first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID + session.add(first_mapped_variant) + session.commit() + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&namespaces=vep&drop_na_columns=true" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert "vep.vep_functional_consequence" in reader.fieldnames + assert "clingen.clingen_allele_id" in reader.fieldnames + rows = list(reader) + assert any(row.get("vep.vep_functional_consequence") == "missense_variant" for row in rows) + assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID + + +def test_download_clingen_and_scores_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): + experiment = create_experiment(client) + score_set = create_seq_score_set(client, experiment["urn"]) + score_set = mock_worker_variant_insertion( + client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv" + ) + # Create mapped variants with VEP consequence populated + create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION) + db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one() + first_mapped_variant = db_score_set.variants[0].mapped_variants[0] + first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID + session.add(first_mapped_variant) + session.commit() + + with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue: + published_score_set = publish_score_set(client, score_set["urn"]) + worker_queue.assert_called_once() + + response = client.get( + f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=scores&namespaces=clingen&drop_na_columns=true" + ) + assert response.status_code == 200 + reader = csv.DictReader(StringIO(response.text)) + assert "clingen.clingen_allele_id" in reader.fieldnames + rows = list(reader) + assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID + download_multiple_data_csv = response.text + reader = csv.DictReader(StringIO(download_multiple_data_csv)) + assert sorted(reader.fieldnames) == sorted( + [ + "accession", + "clingen.clingen_allele_id", + "hgvs_nt", + "hgvs_pro", + "scores.score", + ] + ) + + def test_download_clinvar_namespace_in_variant_data_path(session, data_provider, client, setup_router_db, data_files): """ClinVar namespace returns clinical_significance and clinical_review_status columns with correct values.""" # The ClinVar control seeded in setup_router_db has db_version="11_2024", mapping to namespace clinvar.2024_11.