I have a large collection of gVCF files. Testing on 10,000 first with in view to scale up to 50,000 and 100,000 participants in due time. I wish to load the single sample gVCFs into VDS. I wonder if there is some guideline in term of a batch size that would be optimum.
My first approach would be to create VDS by batch of 1000 participants then combine the VDSs by pairs ~ following a binary tree approach.
some rational:
The batch of 1000 worked well in other processing (IGG joint calling)
The binary tree merging was efficient to create a 10,000 participants dense matrix table from gVCF using an old version of Hail.
In term of snippet:
Import gVCF into VDS
# From a list of 10,000 gVCF s3 path
ls_gvcf
# Params
batch = 0
step = 1000
# Combine gVCF
combiner = hl.vds.new_combiner(
output_path=f'${VDS_PREFIX}/SG10K_Health_${batch}.vds',
temp_path='${VDS_TMP}/',
gvcf_paths=ls_gvcf[batch:(batch+step)],
use_genome_default_intervals=True,
)
combiner.run()
# Eventually looping from 1000 to 1000 until all the batch VDS are created
Rolling union
Here was a snippet for union of matrix tables. I would update the code to combine VDS. The idea is from a list of VDS, combine 1+2, 3+4, 5+6… on rolling union until the last step merge all the data in one file.
# Create a manifest
ht_manifest.show()
+-------------------+-----------------------------------------+-------+
| bucket | key | batch |
+-------------------+-----------------------------------------+-------+
| str | str | int32 |
+-------------------+-----------------------------------------+-------+
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-1.mt" | 1 |
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-2.mt" | 2 |
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-3.mt" | 3 |
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-4.mt" | 4 |
...
###
# List of path
manifest = ht_manifest.collect()
# Get last index, add 1 to init the loop
last_idx = manifest[-1].batch + 1
n = last_idx
while(len(manifest)>1):
try:
# Pop A
m_A = manifest.pop(0)
mt_A = hl.read_matrix_table(f's3://{m_A.bucket}/{m_A.key}')
idx_A = m_A.batch
try:
# Pop B
m_B = manifest.pop(0)
mt_B = hl.read_matrix_table(f's3://{m_B.bucket}/{m_B.key}')
idx_B = m_B.batch
# Union
mt_U = mt_A.union_cols(mt_B, 'outer')
idx_U = n
path_U = union_prefix+'/'+'union-' + str(idx_U) + '_' + str(idx_A) + '-' + str(idx_B) + '.mt'
m_U = hl.Struct(**{'bucket': bucket, 'key': path_U, 'batch': idx_U})
# Save path
manifest.append(m_U)
# Write
mt_U.write(f's3://{m_U.bucket}/{m_U.key}', overwrite=True)
# Detele source file if not in initial manifest
if (idx_A >= last_idx):
bt_bucket.objects.filter(Prefix=m_A.key).delete()
if (idx_B >= last_idx):
bt_bucket.objects.filter(Prefix=m_B.key).delete()
# Next
n+=1
except:
print('Cannot read '+m_B.key)
break
except:
print('Cannot read '+m_A.key)
break
print('input: '+f'{last_idx:,}'+' - iterations: '+f'{n-1:,}')
I am curous to know if that approach make sense or if there is optimisation for VDS
You should run all of your GVCFs into a single combiner job. It’s designed to handle very large numbers of GVCFs at a time. It appropriately subdivides the work.
AmazonS3Exception: The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState;)
Note that the gVCF are stored in cold storage. I restored them prior to the compute.
/!\ I did not restore the index, *.gvcf.gz.tbi files are still in cold storage. I hope the issue is due to this and will be solved as soon as I restore the tbi files.
Do you have any insight of what happen on this big job of >100,000 tasks ?
Exploring the resources I do not see much difference between the tasks that completed and the one that failed. Note that the nodes are still alive and the Spark session is still alive. it is unlikely to be an issue of OOM or Disk space. If it was an issue of resources, I would expect the spark session to died. Instead is seems that a few tasks are failing and as soon as one task failed 4 times, spark kill the job (gracefully).
This will reduce the number of tasks, as we won’t be partitioning the data as finely.
The other potential option is to try to break the job into pieces, in the hope(?) that the issue is caused by having the spark job run for so long and cycling the executors will help. We have several flags to control mostly internal behavior and you can set the relevant one to limit the number of tasks that we’ll run at once by doing the following at the start of your pipeline:
# we seem to having issues around 30k tasks, so 25k may be appropriate.
# NB: this argument needs to be a string as we are setting a string value on the backend.
hl._set_flags(spark_max_stage_parallelism='25000')
Glad to hear it worked. I don’t believe you should be concerned about the warnings.
I’m not entirely certain what’s up with the first warning, but it’s probably happening because your GVCFs don’t have PGT.
The combiner creates a plan that is updated after every step and can be used to resume the work without starting everything over if a job fails. If the save_path argument to new_combiner is None, then we generate a stable value from the other parameters to use as the plan, and, when you call new_combiner, if the plan exists and the force argument is False, then the plan will be loaded and used.
HI @chrisvittal - I am now testing to combine 1,000 samples. The first step, gVCF to VDS, worked with the setup below:
# input: 1,000 samples / branch factor 50
# output: 20 VDS of 50 samples each
# Change the number of tasks hail is launching in parallel
hl._set_flags(spark_max_stage_parallelism='20000')
# Combine gVCF
combiner = hl.vds.new_combiner(
output_path=f'{vds_prefix}/SG10K_Health_1k-bf50-tr100k-sp20k.vds',
temp_path=f'{vds_prefix}/checkpoints/',
gvcf_paths=ls_gvcf[0:1000],
use_genome_default_intervals=True,
reference_genome='GRCh38',
branch_factor=50, # number of inputs combined in one VDS
target_records=100000 # number of rows per partition
)
combiner.run()
But the 2nd step failed with same error as before.
I was wondering, how many WGS your team has handled to import in VDS, and what was the geometry ( branch factor, target_records ) ? Could you share the Spark environment ? ( there is a bunch of connection settings, yarn, … parameters that could be set differently between google and AWS )
By default, our cluster initialization script sets the following properties:
spark:spark.task.maxFailures=20
spark:spark.driver.extraJavaOptions=-Xss4M
spark:spark.executor.extraJavaOptions=-Xss4M
spark:spark.speculation=true
hdfs:dfs.replication=1
spark:spark.driver.memory=36g # this is machine specific
I don’t know a lot about configuring spark/yarn/etc. clusters, I’ve done what’s worked on dataproc as that’s the only environment I’ve ever really used. I’ll send you the configuration of a small dataproc cluster in the hopes that it will help. I can’t attach zip/tar files here.
Hi @mhebrard,
I am also trying to set up an EMR cluster to run Hail.
I was able to run ~200 samples, but still having difficulties with running the 3200 samples of the 1KG project. If you were able to run for thousands of samples, can you share what was the configuration of the EMR cluster you used?
@gil - I am planning to share my current EMR configuration very soon. Will update you here.
I successfully created 4 VDS of 250 samples. Currently testing on combining them into 1 VDS of 1,000
@gil - I just published the current version of Hail-on AWS stack that I am using. Basic Hail processing works fine ( read matrix table, compute, write ). I need further test around plotting.
On the VDS front, I managed to create indermediary VDS of 250 samples, still having issue when combining those. The main error is:
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
Hi @mhebrard,
I think that what worked for us for this type of error is modifying the spark_max_stage_parallelism, branch_factor and target_records. See code snippet below:
# Hail initialization
hl.init(tmp_dir=temp_path)
hl._set_flags(spark_max_stage_parallelism='20000')
combiner = hl.vds.new_combiner(output_path=f"s3://.../combine_800.vds",
temp_path=temp_path,
gvcf_paths=matched_files[600:800], # 200 gvcfs
vds_paths=['s3://.../combined_600.vds'],
use_genome_default_intervals=True,
gvcf_reference_entry_fields_to_keep=['GQ', 'MIN_DP'],
reference_genome='GRCh38',
branch_factor=50, # number of inputs combined in one VDS
target_records=100000 # number of rows per partition
)
combiner.run()
We were able to join 1800 gvcfs iteratively, each time adding 200 gvcfs to a previous VDS (which is the use case we expect in reality as we collect data).
As for the details of the EMR, this is what worked for us:
Master: “r5.4xlarge”
Worker group 1: r5.4xlarge
Worker group 2: m5.4xlarge
Number of worker group on-demand: 10
Number of worker 1 group spot: 12
Number of worker 2 group spot: 10
This is quite preliminary, and not optimized, but maybe it will provide you with a path forward.