Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 61 additions & 108 deletions src/mavedb/lib/score_sets.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,116 +616,66 @@ def get_score_set_variants_as_csv(
clinvar_namespaces[ns] = db_version
namespaced_score_set_columns[ns] = ["clinical_significance", "clinical_review_status"]

variants: Sequence[Variant] = []
mappings: Optional[list[Optional[MappedVariant]]] = None
gnomad_data: Optional[list[Optional[GnomADVariant]]] = None

# Mappings are needed whenever post-mapped HGVS or any ClinVar namespace is requested.
need_mappings = bool(include_post_mapped_hgvs or clinvar_namespaces)

if "gnomad" in namespaces and need_mappings:
variants_mappings_and_gnomad_query = (
select(Variant, MappedVariant, GnomADVariant)
.join(
MappedVariant,
and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)),
isouter=True,
)
.join(MappedVariant.gnomad_variants.of_type(GnomADVariant), isouter=True)
.where(
and_(
Variant.score_set_id == score_set.id,
or_(
and_(
GnomADVariant.db_name == "gnomAD",
GnomADVariant.db_version == "v4.1",
),
GnomADVariant.id.is_(None),
),
)
)
.order_by(cast(func.split_part(Variant.urn, "#", 2), Integer))
)
if start:
variants_mappings_and_gnomad_query = variants_mappings_and_gnomad_query.offset(start)
if limit:
variants_mappings_and_gnomad_query = variants_mappings_and_gnomad_query.limit(limit)
variants_mappings_and_gnomad = db.execute(variants_mappings_and_gnomad_query).all()

variants = []
mappings = []
gnomad_data = []
for variant, mapping, gnomad in variants_mappings_and_gnomad:
variants.append(variant)
mappings.append(mapping)
gnomad_data.append(gnomad)
elif need_mappings:
variants_and_mappings_query = (
select(Variant, MappedVariant)
.join(
MappedVariant,
and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)),
isouter=True,
)
.where(Variant.score_set_id == score_set.id)
.order_by(cast(func.split_part(Variant.urn, "#", 2), Integer))
need_mappings = (
include_post_mapped_hgvs
or "clingen" in namespaces
or "vep" in namespaces
or "gnomad" in namespaces
or bool(clinvar_namespaces)
)
need_gnomad = "gnomad" in namespaces

variants: list[Variant] = []
mappings: Optional[list[Optional[MappedVariant]]] = [] if need_mappings else None
gnomad_data: Optional[list[Optional[GnomADVariant]]] = [] if need_gnomad else None

select_columns: list[Any] = [Variant]
if need_mappings:
select_columns.append(MappedVariant)
if need_gnomad:
select_columns.append(GnomADVariant)

query = (
select(*select_columns)
.where(Variant.score_set_id == score_set.id)
.order_by(cast(func.split_part(Variant.urn, "#", 2), Integer))
)

if need_mappings:
query = query.join(
MappedVariant,
and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)),
isouter=True,
)
if start:
variants_and_mappings_query = variants_and_mappings_query.offset(start)
if limit:
variants_and_mappings_query = variants_and_mappings_query.limit(limit)
variants_and_mappings = db.execute(variants_and_mappings_query).all()

variants = []
mappings = []
for variant, mapping in variants_and_mappings:
variants.append(variant)
mappings.append(mapping)
elif "gnomad" in namespaces:
variants_and_gnomad_query = (
select(Variant, GnomADVariant)
.join(
MappedVariant,
and_(Variant.id == MappedVariant.variant_id, MappedVariant.current.is_(True)),
isouter=True,
)
.join(MappedVariant.gnomad_variants.of_type(GnomADVariant), isouter=True)
.where(
and_(
Variant.score_set_id == score_set.id,
or_(
and_(
GnomADVariant.db_name == "gnomAD",
GnomADVariant.db_version == "v4.1",
),
GnomADVariant.id.is_(None),
),
)

if need_gnomad:
query = query.join(
MappedVariant.gnomad_variants.of_type(GnomADVariant),
isouter=True,
).where(
or_(
and_(GnomADVariant.db_name == "gnomAD", GnomADVariant.db_version == "v4.1"),
GnomADVariant.id.is_(None),
)
.order_by(cast(func.split_part(Variant.urn, "#", 2), Integer))
)
if start:
variants_and_gnomad_query = variants_and_gnomad_query.offset(start)
if limit:
variants_and_gnomad_query = variants_and_gnomad_query.limit(limit)
variants_and_gnomad = db.execute(variants_and_gnomad_query).all()

variants = []
gnomad_data = []
for variant, gnomad in variants_and_gnomad:
variants.append(variant)
gnomad_data.append(gnomad)
else:
variants_query = (
select(Variant)
.where(Variant.score_set_id == score_set.id)
.order_by(cast(func.split_part(Variant.urn, "#", 2), Integer))
)
if start:
variants_query = variants_query.offset(start)
if limit:
variants_query = variants_query.limit(limit)
variants = db.scalars(variants_query).all()

if start:
query = query.offset(start)
if limit:
query = query.limit(limit)

result = db.execute(query).all()

for row in result:
variant = row[0]
variants.append(variant)

if need_mappings and mappings is not None:
mappings.append(row[1])

if need_gnomad and gnomad_data is not None:
idx = 2 if need_mappings else 1
gnomad_data.append(row[idx])

# For each ClinVar namespace, fetch a mapping from mapped_variant_id to ClinicalControl.
clinvar_data_map: dict[str, dict[int, Optional[ClinicalControl]]] = {}
Expand Down Expand Up @@ -763,7 +713,10 @@ def get_score_set_variants_as_csv(
for mapping in mappings:
row_clinvar: dict[str, Optional[ClinicalControl]] = {}
for ns, mv_to_cc in clinvar_data_map.items():
row_clinvar[ns] = mv_to_cc.get(mapping.id) if mapping is not None else None
if mapping is not None and mapping.id is not None:
row_clinvar[ns] = mv_to_cc.get(mapping.id)
else:
row_clinvar[ns] = None
clinvar_per_variant.append(row_clinvar)

rows_data = variants_to_csv_rows(
Expand Down
77 changes: 71 additions & 6 deletions tests/routers/test_score_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -3382,7 +3382,7 @@ def test_download_vep_file_in_variant_data_path(session, data_provider, client,
worker_queue.assert_called_once()

response = client.get(
f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&include_post_mapped_hgvs=true&drop_na_columns=true"
f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=vep&drop_na_columns=true"
)
assert response.status_code == 200
reader = csv.DictReader(StringIO(response.text))
Expand Down Expand Up @@ -3411,7 +3411,7 @@ def test_download_clingen_file_in_variant_data_path(session, data_provider, clie
worker_queue.assert_called_once()

response = client.get(
f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&include_post_mapped_hgvs=true&drop_na_columns=true"
f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&drop_na_columns=true"
)
assert response.status_code == 200
reader = csv.DictReader(StringIO(response.text))
Expand All @@ -3422,10 +3422,6 @@ def test_download_clingen_file_in_variant_data_path(session, data_provider, clie

def test_download_gnomad_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files):
experiment = create_experiment(client)
score_set = create_seq_score_set(client, experiment["urn"])
score_set = mock_worker_variant_insertion(
client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv"
)
# Link a gnomAD variant to the first mapped variant (version may not match export filter)
score_set = create_seq_score_set_with_mapped_variants(
client, session, data_provider, experiment["urn"], data_files / "scores.csv"
Expand All @@ -3444,6 +3440,75 @@ def test_download_gnomad_file_in_variant_data_path(session, data_provider, clien
assert "gnomad.gnomad_af" in reader.fieldnames


def test_download_clingen_and_vep_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files):
experiment = create_experiment(client)
score_set = create_seq_score_set(client, experiment["urn"])
score_set = mock_worker_variant_insertion(
client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv"
)
# Create mapped variants with VEP consequence populated
create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION)
db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one()
first_mapped_variant = db_score_set.variants[0].mapped_variants[0]
first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID
session.add(first_mapped_variant)
session.commit()

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
published_score_set = publish_score_set(client, score_set["urn"])
worker_queue.assert_called_once()

response = client.get(
f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=clingen&namespaces=vep&drop_na_columns=true"
)
assert response.status_code == 200
reader = csv.DictReader(StringIO(response.text))
assert "vep.vep_functional_consequence" in reader.fieldnames
assert "clingen.clingen_allele_id" in reader.fieldnames
rows = list(reader)
assert any(row.get("vep.vep_functional_consequence") == "missense_variant" for row in rows)
assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID


def test_download_clingen_and_scores_file_in_variant_data_path(session, data_provider, client, setup_router_db, data_files):
experiment = create_experiment(client)
score_set = create_seq_score_set(client, experiment["urn"])
score_set = mock_worker_variant_insertion(
client, session, data_provider, score_set, data_files / "scores.csv", data_files / "counts.csv"
)
# Create mapped variants with VEP consequence populated
create_mapped_variants_for_score_set(session, score_set["urn"], TEST_MAPPED_VARIANT_WITH_HGVS_G_EXPRESSION)
db_score_set = session.query(ScoreSetDbModel).filter(ScoreSetDbModel.urn == score_set["urn"]).one()
first_mapped_variant = db_score_set.variants[0].mapped_variants[0]
first_mapped_variant.clingen_allele_id = VALID_CLINGEN_CA_ID
session.add(first_mapped_variant)
session.commit()

with patch.object(arq.ArqRedis, "enqueue_job", return_value=None) as worker_queue:
published_score_set = publish_score_set(client, score_set["urn"])
worker_queue.assert_called_once()

response = client.get(
f"/api/v1/score-sets/{published_score_set['urn']}/variants/data?namespaces=scores&namespaces=clingen&drop_na_columns=true"
)
assert response.status_code == 200
reader = csv.DictReader(StringIO(response.text))
assert "clingen.clingen_allele_id" in reader.fieldnames
rows = list(reader)
assert rows[0].get("clingen.clingen_allele_id") == VALID_CLINGEN_CA_ID
download_multiple_data_csv = response.text
reader = csv.DictReader(StringIO(download_multiple_data_csv))
assert sorted(reader.fieldnames) == sorted(
[
"accession",
"clingen.clingen_allele_id",
"hgvs_nt",
"hgvs_pro",
"scores.score",
]
)


def test_download_clinvar_namespace_in_variant_data_path(session, data_provider, client, setup_router_db, data_files):
"""ClinVar namespace returns clinical_significance and clinical_review_status columns with correct values."""
# The ClinVar control seeded in setup_router_db has db_version="11_2024", mapping to namespace clinvar.2024_11.
Expand Down
Loading