Large gVCF into VDS

Hi team !

Asking for some insight / tips

I have a large collection of gVCF files. Testing on 10,000 first with in view to scale up to 50,000 and 100,000 participants in due time. I wish to load the single sample gVCFs into VDS. I wonder if there is some guideline in term of a batch size that would be optimum.

My first approach would be to create VDS by batch of 1000 participants then combine the VDSs by pairs ~ following a binary tree approach.
some rational:

  • The batch of 1000 worked well in other processing (IGG joint calling)
  • The binary tree merging was efficient to create a 10,000 participants dense matrix table from gVCF using an old version of Hail.

In term of snippet:

Import gVCF into VDS

# From a list of 10,000 gVCF s3 path
ls_gvcf

# Params
batch = 0
step = 1000

# Combine gVCF
combiner = hl.vds.new_combiner(
    output_path=f'${VDS_PREFIX}/SG10K_Health_${batch}.vds',
    temp_path='${VDS_TMP}/',
    gvcf_paths=ls_gvcf[batch:(batch+step)],
    use_genome_default_intervals=True,
)

combiner.run()

# Eventually looping from 1000 to 1000 until all the batch VDS are created

Rolling union
Here was a snippet for union of matrix tables. I would update the code to combine VDS. The idea is from a list of VDS, combine 1+2, 3+4, 5+6… on rolling union until the last step merge all the data in one file.

# Create a manifest
ht_manifest.show()
+-------------------+-----------------------------------------+-------+
| bucket            | key                                     | batch |
+-------------------+-----------------------------------------+-------+
| str               | str                                     | int32 |
+-------------------+-----------------------------------------+-------+
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-1.mt"  |     1 |
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-2.mt"  |     2 |
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-3.mt"  |     3 |
| "precise-scratch" | "hebrardms/SG100K_r2.0/igg/batch-4.mt"  |     4 |
...

###
# List of path
manifest = ht_manifest.collect()
# Get last index, add 1 to init the loop
last_idx = manifest[-1].batch + 1
n = last_idx

while(len(manifest)>1):
    try:
        # Pop A
        m_A = manifest.pop(0)
        mt_A =  hl.read_matrix_table(f's3://{m_A.bucket}/{m_A.key}')
        idx_A = m_A.batch
        
        try:
            # Pop B
            m_B = manifest.pop(0)
            mt_B =  hl.read_matrix_table(f's3://{m_B.bucket}/{m_B.key}')
            idx_B = m_B.batch
            
            # Union
            mt_U = mt_A.union_cols(mt_B, 'outer')
            idx_U = n
            path_U = union_prefix+'/'+'union-' + str(idx_U) + '_' + str(idx_A) + '-' + str(idx_B) + '.mt'
            m_U = hl.Struct(**{'bucket': bucket, 'key': path_U, 'batch': idx_U})

            # Save path
            manifest.append(m_U)
            # Write
            mt_U.write(f's3://{m_U.bucket}/{m_U.key}', overwrite=True)
            
            # Detele source file if not in initial manifest
            if (idx_A >= last_idx):
                bt_bucket.objects.filter(Prefix=m_A.key).delete()
            if (idx_B >= last_idx):
                bt_bucket.objects.filter(Prefix=m_B.key).delete()

            # Next
            n+=1
        except:
            print('Cannot read '+m_B.key)
            break
    except:
        print('Cannot read '+m_A.key)
        break

print('input: '+f'{last_idx:,}'+' - iterations: '+f'{n-1:,}')

I am curous to know if that approach make sense or if there is optimisation for VDS

You should run all of your GVCFs into a single combiner job. It’s designed to handle very large numbers of GVCFs at a time. It appropriately subdivides the work.

The following should work for your data.

ls_gvcf = ...
combiner = hl.vds.new_combiner(
    output_path=f'${VDS_PREFIX}/SG10K_Health.vds',
    temp_path='${VDS_TMP}/',
    gvcf_paths=ls_gvcf[batch:(batch+step)],
    use_genome_default_intervals=True,
)
combiner.run()

Thanks Chris for the insight.

I encounter some issue

AmazonS3Exception: The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState;)

Note that the gVCF are stored in cold storage. I restored them prior to the compute.

/!\ I did not restore the index, *.gvcf.gz.tbi files are still in cold storage. I hope the issue is due to this and will be solved as soon as I restore the tbi files.

I can confirm that restoring the *.tbi solved the issue of InvalidObjectState

using the snippet bellow I managed to create 1 VDS of 10 samples

combiner = hl.vds.new_combiner(
    output_path=f'{vds_prefix}/SG10K_Health_10.vds',
    temp_path=f'{vds_prefix}/checkpoints/',
    gvcf_paths=ls_gvcf[0:10],
    use_genome_default_intervals=True,
    reference_genome='GRCh38' 
)
combiner.run()

But as I try more samples the combiner crash.

Testing with 500 gVCFs on Hail 0.2.134 on AWS cluster of 497 CPUs onDemand - 62 x r6g.2xlarge (8CPU 64Gb) - 62 x 150Gb (9,450Gb) Disk space.

  • I can see 2 jobs of 5 x 2,586 tasks (12,930 tasks) completed in ~1h per job.
  • I can see 5 x dataset.vds written as checkpoints
  • I can see a few short jobs of 2,586 tasks completed in few seconds
  • But the next big job of 118,755 tasks failed after ~1h compute.

(see the task records in SparkUI )


image

The error is a mix of

  • java.io.IOException: Unexpected end of stream pos=0, contentLength=68553440
  • com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

I increased memory & disk space but it seems not impacting the process.

Do you have any insight on what is happening and what could be the issue ?

Do you have stack traces for these errors?

I’m not really sure what could be causing them, all of your parameters seem normal.

Just to be clear, the vds_prefix is an object storage (S3) path?

Any monitoring you have isn’t showing excessive disk/memory usage?

vds_prefix is an s3 path s3://precise/hebrardms/VDS. This seems to work fine as the intermediate files are written within that path.

I tested to increase disk and memory and it did not improve the job

Let me test again and see if I can get a complete stack trace (not easy to find on an cluster)

I did a smaller test with 100 gVCF and a branch factor of 50

combiner = hl.vds.new_combiner(
    output_path=f'{vds_prefix}/SG10K_Health_100-bf20.vds',
    temp_path=f'{vds_prefix}/checkpoints/',
    gvcf_paths=ls_gvcf[0:100],
    use_genome_default_intervals=True,
    reference_genome='GRCh38',
    branch_factor=50
)
combiner.run()

Again is seems to performed the gvcf to indermediary VDS properly by failed at the next big job


image

Do you have any insight of what happen on this big job of >100,000 tasks ?

Exploring the resources I do not see much difference between the tasks that completed and the one that failed. Note that the nodes are still alive and the Spark session is still alive. it is unlikely to be an issue of OOM or Disk space. If it was an issue of resources, I would expect the spark session to died. Instead is seems that a few tasks are failing and as soon as one task failed 4 times, spark kill the job (gracefully).


I attache the trace here:
gVCF_to_VDS.task_trace.txt (68.7 KB)

Do you have any insight on the memory / disk usage expected ?

An interesting test is that if I combine the same 100 gVCF but with a brancg factir of 100, it create the VDS successfully:

combiner = hl.vds.new_combiner(
    output_path=f'{vds_prefix}/SG10K_Health_100-bf100.vds',
    temp_path=f'{vds_prefix}/checkpoints/',
    gvcf_paths=ls_gvcf[0:100],
    use_genome_default_intervals=True,
    reference_genome='GRCh38',
    branch_factor=100
)
combiner.run() 
2025-04-24 06:13:11.128 Hail: INFO: Running VDS combiner:
    VDS arguments: 0 datasets with 0 samples
    GVCF arguments: 100 inputs/samples
    Branch factor: 100
    GVCF merge batch size: 50
2025-04-24 06:13:11.346 Hail: INFO: GVCF combine (job 1): merging 100 GVCFs into 1 datasets
2025-04-24 06:26:45.431 Hail: INFO: wrote matrix table with 2850111452 rows and 100 columns in 2586 partitions to s3://precise-scratch/hebrardms/SG10K_Health/VDS/SG10K_Health_100-bf100.vds/reference_data
2025-04-24 06:37:10.265 Hail: INFO: wrote matrix table with 37755085 rows and 100 columns in 2586 partitions to s3://precise-scratch/hebrardms/SG10K_Health/VDS/SG10K_Health_100-bf100.vds/variant_data
2025-04-24 06:37:10.518 Hail: INFO: VDS combiner: computing reference block max length...
2025-04-24 06:37:41.156 Hail: INFO: Finished VDS combiner!

Then testing to combine the 2 intermediate VDS of 50 sample each

# List of VDS
ls_vds = [
    "s3://.../checkpoints/combiner-intermediates/f1797d74-95df-4f34-9a98-3bd3c1ae0438_gvcf-combine_job1/dataset_0.vds",
    "s3://.../checkpoints/combiner-intermediates/f1797d74-95df-4f34-9a98-3bd3c1ae0438_gvcf-combine_job1/dataset_1.vds"
]

# Combine gVCF
combiner = hl.vds.new_combiner(
    output_path=f'{vds_prefix}/SG10K_Health_100-2x50.vds',
    temp_path=f'{vds_prefix}/checkpoints/',
    vds_paths=ls_vds,
    use_genome_default_intervals=True,
    reference_genome='GRCh38'
)
combiner.run()

This also failed with on the same job, same error as previous

Tasks: 30211 / 117291 (12 failed) in 1.5h

The only thing I can think of is that we are overloading the S3 hadoop library.

There are a few somewhat hidden parameters that may be helpful here. In the vds combiner, you can set a larger value for target_records, like so:

combiner = hl.vds.new_combiner(
    output_path=f'{vds_prefix}/SG10K_Health_100-bf20.vds',
    temp_path=f'{vds_prefix}/checkpoints/',
    gvcf_paths=ls_gvcf[0:100],
    use_genome_default_intervals=True,
    reference_genome='GRCh38',
    branch_factor=50,
    target_records=1_000_000,
)
combiner.run()

This will reduce the number of tasks, as we won’t be partitioning the data as finely.

The other potential option is to try to break the job into pieces, in the hope(?) that the issue is caused by having the spark job run for so long and cycling the executors will help. We have several flags to control mostly internal behavior and you can set the relevant one to limit the number of tasks that we’ll run at once by doing the following at the start of your pipeline:

# we seem to having issues around 30k tasks, so 25k may be appropriate.
# NB: this argument needs to be a string as we are setting a string value on the backend.
hl._set_flags(spark_max_stage_parallelism='25000')
1 Like

I managed to combine the 2 intermediate VDS of 50 sample each after changing target_records=100000 and spark_max_stage_parallelism='20000'.

The compute completed after few jobs

Next step is increasing the number of samples.

I do have some strange warning:

2025-04-28 06:32:52.348 Hail: WARN: Mismatch between 'call_fields' and VDS call fields. Overwriting with call fields from supplied VDS.
    VDS call fields      : []
    requested call fields: ['PGT']

2025-04-28 06:32:52.433 Hail: WARN: found existing combiner plan at s3://.../VDS/checkpoints/combiner-plans/vds-combiner-plan_7dda56b011b22121ebabcb4a0e5d160e640e6f9e7e154194ed397ad40b11ba3c_0.2.134.json, using it
2025-04-28 06:32:52.434 Hail: INFO: Running VDS combiner:
    VDS arguments: 2 datasets with 100 samples
    GVCF arguments: 0 inputs/samples
    Branch factor: 100
    GVCF merge batch size: 50
2025-04-28 06:32:52.610 Hail: INFO: VDS Combine (job 1): merging 2 datasets with 100 samples
2025-04-28 06:33:27.353 Hail: INFO: wrote table with 2814952570 rows in 2586 partitions to s3://.../VDS/checkpoints/combiner-intermediates/c39ad9e8-4dc2-4327-b566-e51613516c1d_vds-combine_job1/interval_checkpoint.ht
2025-04-28 07:07:11.290 Hail: INFO: wrote matrix table with 2850111452 rows and 100 columns in 28151 partitions to s3://.../VDS/SG10K_Health_100-2x50.vds/reference_data
2025-04-28 07:37:37.451 Hail: INFO: wrote matrix table with 37755085 rows and 100 columns in 28151 partitions to s3://.../VDS/SG10K_Health_100-2x50.vds/variant_data
2025-04-28 07:37:37.757 Hail: INFO: VDS combiner: computing reference block max length...
2025-04-28 07:51:03.621 Hail: INFO: Finished VDS combiner!

Should I be worry about the warnings ?

Glad to hear it worked. I don’t believe you should be concerned about the warnings.

I’m not entirely certain what’s up with the first warning, but it’s probably happening because your GVCFs don’t have PGT.

The combiner creates a plan that is updated after every step and can be used to resume the work without starting everything over if a job fails. If the save_path argument to new_combiner is None, then we generate a stable value from the other parameters to use as the plan, and, when you call new_combiner, if the plan exists and the force argument is False, then the plan will be loaded and used.

1 Like

HI @chrisvittal - I am now testing to combine 1,000 samples. The first step, gVCF to VDS, worked with the setup below:

# input: 1,000 samples / branch factor 50
# output: 20 VDS of 50 samples each

# Change the number of tasks hail is launching in parallel
hl._set_flags(spark_max_stage_parallelism='20000')

# Combine gVCF
combiner = hl.vds.new_combiner(
    output_path=f'{vds_prefix}/SG10K_Health_1k-bf50-tr100k-sp20k.vds',
    temp_path=f'{vds_prefix}/checkpoints/',
    gvcf_paths=ls_gvcf[0:1000],
    use_genome_default_intervals=True,
    reference_genome='GRCh38',
    branch_factor=50, # number of inputs combined in one VDS
    target_records=100000 # number of rows per partition
)

combiner.run()

But the 2nd step failed with same error as before.

I was wondering, how many WGS your team has handled to import in VDS, and what was the geometry ( branch factor, target_records ) ? Could you share the Spark environment ? ( there is a bunch of connection settings, yarn, … parameters that could be set differently between google and AWS )

I’m sorry to hear that.

WGS, we’ve done 150k+. WES, almost 1m.

By default, our cluster initialization script sets the following properties:

spark:spark.task.maxFailures=20
spark:spark.driver.extraJavaOptions=-Xss4M
spark:spark.executor.extraJavaOptions=-Xss4M
spark:spark.speculation=true
hdfs:dfs.replication=1
spark:spark.driver.memory=36g   # this is machine specific

I don’t know a lot about configuring spark/yarn/etc. clusters, I’ve done what’s worked on dataproc as that’s the only environment I’ve ever really used. I’ll send you the configuration of a small dataproc cluster in the hopes that it will help. I can’t attach zip/tar files here.

@chrisvittal - Thanks for the info. I got the configuration.

Another consideration is what was your cluster geometry to combine 150K+ WGS ?

  • How many nodes ?
  • Did you setup worker vs data node ?
  • What is the instance type of each node ? ( CPU, RAM, architecture, disk space )
  • Were you using a special network ?

That might impact the jobs

Hi @mhebrard,
I am also trying to set up an EMR cluster to run Hail.
I was able to run ~200 samples, but still having difficulties with running the 3200 samples of the 1KG project. If you were able to run for thousands of samples, can you share what was the configuration of the EMR cluster you used?

@gil - I am planning to share my current EMR configuration very soon. Will update you here.
I successfully created 4 VDS of 250 samples. Currently testing on combining them into 1 VDS of 1,000

1 Like

@gil - I just published the current version of Hail-on AWS stack that I am using. Basic Hail processing works fine ( read matrix table, compute, write ). I need further test around plotting.

On the VDS front, I managed to create indermediary VDS of 250 samples, still having issue when combining those. The main error is:

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

Not sure how to solve that…

Hi @mhebrard,
I think that what worked for us for this type of error is modifying the spark_max_stage_parallelism, branch_factor and target_records. See code snippet below:

# Hail initialization
hl.init(tmp_dir=temp_path)
hl._set_flags(spark_max_stage_parallelism='20000')

combiner = hl.vds.new_combiner(output_path=f"s3://.../combine_800.vds",
                               temp_path=temp_path,
                               gvcf_paths=matched_files[600:800],  # 200 gvcfs
                               vds_paths=['s3://.../combined_600.vds'],
                               use_genome_default_intervals=True,
                               gvcf_reference_entry_fields_to_keep=['GQ', 'MIN_DP'],
                               reference_genome='GRCh38',
                               branch_factor=50,  # number of inputs combined in one VDS
                               target_records=100000  # number of rows per partition
                               )
combiner.run()

We were able to join 1800 gvcfs iteratively, each time adding 200 gvcfs to a previous VDS (which is the use case we expect in reality as we collect data).

As for the details of the EMR, this is what worked for us:
Master: “r5.4xlarge”
Worker group 1: r5.4xlarge
Worker group 2: m5.4xlarge
Number of worker group on-demand: 10
Number of worker 1 group spot: 12
Number of worker 2 group spot: 10

This is quite preliminary, and not optimized, but maybe it will provide you with a path forward.