Calculating relatedness in 40.000 samples

Hello!
I have a dataset with a random subset of 100.455 and 42.046 samples. My goal is to create a dataset only with unrelated samples.
I’m using this code:

import hail as hl 
from pyspark import SparkConf
# Set Spark memory configuration
hl.init(
    spark_conf={
        'spark.driver.memory': '50g',
        'spark.executor.memory': '50g' 
    }
)
# Load matrix
afterQC_vcfmatrix = hl.read_matrix_table(afterQC_vcfmatrix_path) 
# STEP 1 : Compute relatedness using KING
print("Number of samples and variants before relatedness analysis:", afterQC_vcfmatrix.count())
king_ht = hl.king(afterQC_vcfmatrix.GT)  # KING operates on the genotype data (afterQC_vcfmatrix.GT)
print("***KING relatedness computed***")
high_kinship_score = king_ht.filter_entries(king_ht['phi'] > 0.45) # get the pairs of higly releated samples (auto-comparison of samples and twins)
high_kinship_score_table = high_kinship_score.entries()
twins_table = high_kinship_score_table.filter(high_kinship_score_table['s_1'] != high_kinship_score_table['s']) # delete the auto-comparion of samples (phi score of 0.5)
twins_table.export(f"./{output_name}high-kinship-score-samples-king")
print(f"High kinship samples deleted by King saved in: ./{output_name}high-kinship-score-samples-king")
# STEP 2 : Compute relatedness using pc_relate
pca_eigenvalues, pca_scores, pca_loadings = hl.hwe_normalized_pca(afterQC_vcfmatrix.GT, k=10, compute_loadings=False)  # Compute PCA
relatedness_ht = hl.pc_relate(afterQC_vcfmatrix.GT, min_individual_maf=0.01, scores_expr=pca_scores[afterQC_vcfmatrix.col_key].scores,
                              block_size=4096, min_kinship=0.088, statistics='all')  # Compute relatedness
pairs = relatedness_ht.filter(relatedness_ht['kin'] > 0.088) # Filter pairs based on kinship threshold
related_samples_to_remove = hl.maximal_independent_set(pairs.i, pairs.j, False) # Get the maximal independent set of related samples to remove
samples_to_remove = related_samples_to_remove.collect() 
with open(relatedness_path, "w") as file: # Save removed sample ID into a file
    for sample in samples_to_remove:
        file.write(f"{sample['node']}\n")
print(f"Samples after maximal independet set and pc_related: {output_name}-pc_relate_deleted_samples")
afterQC_vcfmatrix_pcrelate = afterQC_vcfmatrix.filter_cols(hl.is_defined(related_samples_to_remove[afterQC_vcfmatrix.col_key]), keep=False) # Remove related samples from the matrix table
print("***Number of samples after maximal independet set and pc_related", afterQC_vcfmatrix_pcrelate.count()) 
## STEP 3: Delete plus 0.45 samples after pc_relate
samples_to_remove = twins_table.aggregate(hl.agg.collect_as_set(twins_table.s)) # Remove the high kinship score samples
set_to_remove = hl.literal(samples_to_remove)
afterQC_vcfmatrix_pcrelate_king = afterQC_vcfmatrix_pcrelate.filter_cols(~set_to_remove.contains(afterQC_vcfmatrix_pcrelate['s']))
print("***Number of samples with high kinship score:", afterQC_vcfmatrix_pcrelate_king.count())

I’ve already used it with a smaller dataset (100.000 variants and 700 samples) and it worked fine. Attached there’s the error log, but to sum up this is the error I get:

From org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 163.0 failed 1 times, most recent failure: Lost task 20.0 in stage 163.0 (TID 79873) (bio-box.internal executor driver): org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout

It seems a problem of memory. How much memory does spark needs to work with 42.000? Is there a better way of filtering the related samples?

Thanks beforehand!!

Mireia
hail-20241127-1629-0.2.120-f00f916faf78TAIL.log (62.7 KB)