How should I use Hail on the DNANexus RAP?

How should I use Hail on the DNANexus RAP? I understand that there are at least two different file systems: the Project File System or “File API” and the Database File System or “Database API”.

importing data

The project file system is used for files not produced by Hail. Examples include VCF or BGEN files. You should always access this data through the DNANexus Project File System FUSE mount: file:///mnt/project/.... Do not download this data onto HDFS.

The project file system can handle neither Hail Tables nor Hail Matrix Tables; however, DNANexus provides the database file system for storing Table and Matrix Table files. A “database” in the database file system is roughly analogous to a “bucket” in Amazon S3 or Google Cloud Storage. You can create a “database” by executing this Python code:

import pyspark
import dxpy

sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
spark.sql("CREATE DATABASE my_database LOCATION  'dnax://'")

You only need to do this once. Every time you initialize Hail, you must use this database as the temporary directory and you must start a Spark SQL Session before you initialize Hail.

import pyspark
import dxpy
import hail as hl

my_database = dxpy.find_one_data_object(
    name="my_database", 
    project=dxpy.find_one_project()["id"]
)["id"]
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
hl.init(sc=sc, tmp_dir=f'dnax://{my_database}/tmp/')

Moreover, any time you want to write a Table or Matrix Table you need to use a path that starts with f'dnax://{my_database}'.

NB: You cannot inspect the contents of a DNANexus database from the web UI, so be certain to remember all the file paths you’ve used!

exporting data

If you export a merged TSV or VCF and want to move it from the database file system to the project file system, execute this shell command:

hadoop fs -cat dnax://my_database/path | dx upload -o /path/in/project

If you need to export a results Table or Matrix Table (which are comprised of many individual files), configure your DNANexus sparks cluster to directly access an S3 bucket you own and then write directly into that bucket:

results_mt.write('s3a://my-s3-bucket/results.mt')
1 Like

Hi Dan, this solution works when I’m using a pre-configured Hail in JupyterLab, but this line throws an error when I try it in a Cloud Workstation instance in which I have manually installed Hail.

This is the error I get:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/dnanexus/miniconda3/envs/hail/lib/python3.7/site-packages/pyspark/sql/session.py", line 723, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/home/dnanexus/miniconda3/envs/hail/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/dnanexus/miniconda3/envs/hail/lib/python3.7/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/dnanexus/miniconda3/envs/hail/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o21.sql.
: org.apache.spark.SparkException: Unable to create database my_database as failed to create its directory dnax:/

        at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:125)
        at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.createDatabase(InMemoryCatalog.scala:118)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createDatabase(ExternalCatalogWithListener.scala:47)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:233)
        at org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:82)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "dnax"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
        at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:120)
        ... 33 more

:pensive:

In all likelihood, this means that you’re missing the “Hadoop connector” for DNANexus’ dnax protocol. I sent them an email back in 2021 requesting they publish their Hadoop connector and they demurred. I think your best bet is to email DNANexus support directly and request they make their dnax Hadoop connector publicly available or, at least, available to you in the cloud workstation.

2 Likes

Hi,
I had a hard time dealing with similar error and I ended up using their provided version of hail(0.2.78).
My custom installation of hail failed due to lack of support from DNAnexus team.

This can cause issues if you create databases with the same name across multiple projects. find_one_data_object will automatically point to the first database (regardless of project) that you created matching that name.

The error I was getting was

PermissionDenied: UPLOAD permission required in project-B5adl3Zz31982iuyJDKJ to perform this action

Solution is just to pass the project ID to find_one_data_object:

my_database = dxpy.find_one_data_object(
    name="my_database", 
    project=dxpy.find_one_project()["id"]
)["id"]

Alternatively I guess you could always put the project ID in the database name.

1 Like

Thanks Nik! I’ve updated the post above to reflect this.

1 Like

Hi folks—I think this question is related enough for me to ask it here. Could someone explain the difference between Dan’s initialization code above and the following code? I’ve seen people use it as well but don’t know why I’d use one method over another.

import pyspark
import hail as hl
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
hl.init(sc=sc)

Best,
Jeremy

Are you referring to this part?

my_database = dxpy.find_one_data_object(
    name="my_database", 
    project=dxpy.find_one_project()["id"]
)["id"]
# ...
hl.init(sc=sc, tmp_dir=f'dnax://{my_database}/tmp/')

Without that I think your temporary directory is in HDFS. That means the performance of your temporary storage is controlled by the number of non-spot (aka expensive) instances you’re using. We strongly recommend using a blob storage directory (e.g. S3, GCS) for temporary storage and spot (aka cheap) instances for compute. This is the fundamental insight of the cloud: disaggregation of compute and storage.

EDIT:

Just to be clear, you need to run this:

import pyspark
import dxpy

sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
spark.sql("CREATE DATABASE my_database LOCATION  'dnax://'")

only once to initialize the my_database blob storage bucket. After doing that once, all your code can just use this to start Hail and configure the temporary directory:

import pyspark
import dxpy
import hail as hl

my_database = dxpy.find_one_data_object(
    name="my_database", 
    project=dxpy.find_one_project()["id"]
)["id"]
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)
hl.init(sc=sc, tmp_dir=f'dnax://{my_database}/tmp/')

Got it! I’ve been able to get Hail up and running—thank you Dan!