I am trying to run GWAS with one trait, including about 400K individuals, and the UKB imputation dataset (code 22828) using Hail
. I am running my analysis with the dx run app-dxjupyterlab_spark_cluster
command in the UK Biobank RAP. I would like to have some feedback on what is the most efficient way to run this analysis considering computation time and cost. I cannot get a clear sense of what is the expected time for the analyses to conclude, noting also that sometimes analyses stall. I am testing the instance type mem1_ssd1_v2_x72
. I am running my script as:
my_cmd="papermill BGENimport_to_GWAS_chr22_full_pipeline.ipynb BGENimport_to_GWAS_chr22_full_pipeline_output.ipynb"
APP_ID=$(dx describe dxjupyterlab_spark_cluster --json | jq -r .id)
EXTRA='{"timeoutPolicyByExecutable": {"'"$APP_ID"'": {"*": {"hours": 4}}}}'
dx run dxjupyterlab_spark_cluster -icmd="$cmd_BGEN_to_MT" -iin="BGENimport_to_GWAS_chr22_full_pipeline.ipynb" --instance-type "mem1_ssd1_v2_x72" --tag="chr22_bgen2GWAS" --priority "high" --name "chr22_bgen2GWAS" --extra-args "$EXTRA" --yes
Here it is a minimum example of my script for chromosome 22:
#1. Initiate Spark and Hail
from pyspark.sql import SparkSession
import hail as hl
import os
import pandas as pd
import dxpy
builder = (SparkSession.builder.enableHiveSupport())
spark = builder.getOrCreate()
hl.init(sc=spark.sparkContext)
#2. Import imputed data into a Hail MatrixTable (MT)
bgen_path = f'/mnt/project/Bulk/Imputation/UKB imputation from genotype'
filename = f'ukb22828_c22_b0_v3'
file_url = f"file://{bgen_path}/{filename}.bgen"
output_name = f'file:///mnt/project/Data/{filename}.bgen.idx2'
hl.index_bgen(path=file_url,
index_file_map={file_url:f"hdfs:///{filename}.idx2"},
reference_genome="GRCh37",
contig_recoding=None,
skip_invalid_loci=False)
index_file_map = {}
index_file_map[f"file://{bgen_path}/{filename}.bgen"] = f"hdfs:///{filename}.idx2"
mt = hl.import_bgen(path=f"file://{bgen_path}/ukb22828_c22_b0_v3.bgen",
entry_fields=['GT', 'GP'],
sample_file=f"file://{bgen_path}/ukb22828_c22_b0_v3.sample",
n_partitions=None,
block_size=None,
index_file_map=index_file_map,
variants=None)
#3. Filter MT for samples ID
# Read the sample phenotype files
phenotype_filename = f'file:///mnt/project/phenotype_trait1.csv'
phenotype_df = pd.read_csv(phenotype_filename, sep = '\t')
# Get samples ID
id_samples = phenotype_df['IID'].tolist()
# Define sample ID set to filter for
filter_ind_id = hl.set(id_samples)
filtered_mt = mt.filter_cols(filter_ind_id.contains(mt.s), keep=True)
#4. Filter MT based on locus quality control
# Run variant-level QC
qc_mt = hl.variant_qc(filtered_mt)
# Create Hail Table from MT
qc_tb = qc_mt.rows()
#Convert an aggregable of genotypes (gs) to an aggregable of genotype quality scores, then compute the IMPUTE information score for each variant and filter variants based on INFO score.
qc_mt_infoscore = qc_mt.filter_rows(hl.agg.info_score(qc_mt.GP).score >= 0.8)
qc_mt_infoscore_tb = qc_mt_infoscore.rows()
#It filters based on MAF (0.001) and HWE (1e-10)
locus_qc_tb = qc_mt_infoscore_tb.filter(
(qc_mt_infoscore_tb["variant_qc"]["AF"][0] >= 0.001) &
(qc_mt_infoscore_tb["variant_qc"]["AF"][0] <= 0.999) &
(qc_mt_infoscore_tb["variant_qc"]["p_value_hwe"] >= 1e-10))
#Filtering of the MT based on "locus_qc_tb" table
locus_qc_mt = qc_mt_infoscore.semi_join_rows(locus_qc_tb)
#5. Run GWAS analysis on Hail
#Convert a pandas dataframe with the trait value in a Hail Table
pheno_table = hl.Table.from_pandas(phenotype_df)
pheno_table = pheno_table.annotate(IID = hl.str(pheno_table.IID))
pheno_table = pheno_table.key_by('IID')
# Annotate the MT with pheno Table by matching the MT's column key ('s') with the pheno Table's key ('sample_id')
phenogeno_mt = locus_qc_mt.annotate_cols(**pheno_table[locus_qc_mt.s])
# Run Hail's logistic regression method
gwas = hl.linear_regression_rows(y=phenogeno_mt.nb_live_births,
x=phenogeno_mt.GT.n_alt_alleles(),
covariates=[1.0],
block_size = 50)
#6. Save GWAS results into a database
# Define database and table name
db_name = "gwas_analyses"
tb_name = "gwas_results_chr22.ht"
# Create database in DNAX
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
print(stmt)
spark.sql(stmt).show()
# Find database ID of newly created database using a dxpy method
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
url = f"dnax://{db_uri}/{tb_name}"
#Write GWAS results into the database
gwas.write(url, overwrite = True)