Problem loading multiple csv files for annotation on DNAnexus

Hi all—

I’m having some trouble loading many phenotypes, stored as .csv files, on DNAnexus to annotate my matrixtable.

There are ~1500 phenotype (“phecode”) files, each of which looks like the screenshot below, with a total of ~400k rows that look like this:

,sample_id,age,has_phenotype,age_squared,phecode,phenotype
55396,3346357,56.7,1,3214.8900000000003,132.0,"Infestation (lice, mites)"
159240,3860334,70.9,1,5026.81,132.0,"Infestation (lice, mites)"
213665,3554529,60.2,1,3624.0400000000004,132.0,"Infestation (lice, mites)"
304542,2953691,59.3,1,3516.49,132.0,"Infestation (lice, mites)"
308211,2246007,59.0,1,3481.0,132.0,"Infestation (lice, mites)"
318349,5327743,73.5,1,5402.25,132.0,"Infestation (lice, mites)"
359018,5288207,70.7,1,4998.490000000001,132.0,"Infestation (lice, mites)"
425964,5014116,59.2,1,3504.6400000000003,132.0,"Infestation (lice, mites)"
473239,1358291,59.0,1,3481.0,132.0,"Infestation (lice, mites)"
486620,2590153,76.7,1,5882.89,132.0,"Infestation (lice, mites)"
495712,1968956,54.4,1,2959.3599999999997,132.0,"Infestation (lice, mites)"

I tried two different ways to load them, and they failed for different reasons. I suspect that neither of these methods is going to work at the full (1500 file) scale anyway, but I’m not sure how to do this more intelligently (perhaps with Spark?).

Using hl.import_table()

I created a dictionary to link the filename with the hail table:

types = {"": hl.tint, "sample_id": hl.tstr, "age": hl.tfloat, "has_phenotype": hl.tfloat, "age_squared": hl.tfloat, "phecode": hl.tfloat, "phenotype": hl.tstr}
phecode_tables = {phecode_csv_file : hl.import_table(f'path_to_/{phecode_csv_file}', delimiter=',', quote='"', types=types).key_by('sample_id') for phecode_csv_file in phecode_csv_files}

where phecode_csv_files is the list of the phecode csv files. I then annotate my matrixtable, mt, with the following:

mt = mt.annotate_cols(**{f'phecode_{clean_phecode_name(phecode_csv_file)}' : phecode_tables[phecode_csv_file][mt.s] for phecode_csv_file in phecode_csv_files})

This is fine until I materialize the mt:

mt.checkpoint('path_to_mt/mt_1_with_phenos.mt/', overwrite=True)

This raises the error. I think there’s a problem with the import, however, because the error says Hail expected 7 columns but got 8. As you can see in the snip of the csv file above (which is the file listed in the error), the “phenotype” column has another comma within the field. Using quote = '"' didn’t help, evidently. Below is a snip of the log file.

I wouldn’t be surprised if this error is fixed in later versions of Hail, but I’m using 0.2.78 on DNAnexus.

2022-12-20 12:06:11 BlockManagerInfo: INFO: Added broadcast_488_piece0 in memory on ip-10-60-1-1.eu-west-2.compute.internal:44000 (size: 11.5 KB, free: 23.1 GB)
2022-12-20 12:06:11 BlockManagerInfo: INFO: Added broadcast_487_piece0 in memory on ip-10-60-1-1.eu-west-2.compute.internal:44000 (size: 49.0 B, free: 23.1 GB)
2022-12-20 12:06:12 TaskSetManager: WARN: Lost task 0.0 in stage 110.0 (TID 1437, ip-10-60-1-1.eu-west-2.compute.internal, executor 4): is.hail.utils.HailException: Caught exception while reading file:/mnt/project/phecodes/132.0.csv: expected 7 fields, but found 8 fields
  offending line: 55396,3346357,56.7,1,3214.8900000000003,132.0,"Infestation (...
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:24)
	at is.hail.utils.package$.fatal(package.scala:78)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:409)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:368)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at __C65935collect_distributed_array.__m65946split_ToArray(Unknown Source)
	at __C65935collect_distributed_array.apply(Unknown Source)
	at __C65935collect_distributed_array.apply(Unknown Source)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1$$anonfun$apply$1.apply(BackendUtils.scala:31)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1$$anonfun$apply$1.apply(BackendUtils.scala:30)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool.scopedRegion(RegionPool.scala:144)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1.apply(BackendUtils.scala:30)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1.apply(BackendUtils.scala:28)
	at is.hail.backend.spark.SparkBackendComputeRDD.compute(SparkBackend.scala:730)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: is.hail.utils.HailException: expected 7 fields, but found 8 fields
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:11)
	at is.hail.utils.package$.fatal(package.scala:78)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:378)
	... 23 more

2022-12-20 12:06:12 TaskSetManager: INFO: Starting task 0.1 in stage 110.0 (TID 1438, ip-10-60-158-153.eu-west-2.compute.internal, executor 6, partition 0, PROCESS_LOCAL, 7955 bytes)
2022-12-20 12:06:12 BlockManagerInfo: INFO: Added broadcast_488_piece0 in memory on ip-10-60-158-153.eu-west-2.compute.internal:44000 (size: 11.5 KB, free: 23.1 GB)
2022-12-20 12:06:12 BlockManagerInfo: INFO: Added broadcast_487_piece0 in memory on ip-10-60-158-153.eu-west-2.compute.internal:44000 (size: 49.0 B, free: 23.1 GB)
2022-12-20 12:06:13 TaskSetManager: INFO: Lost task 0.1 in stage 110.0 (TID 1438) on ip-10-60-158-153.eu-west-2.compute.internal, executor 6: is.hail.utils.HailException (Caught exception while reading file:/mnt/project/phecodes/132.0.csv: expected 7 fields, but found 8 fields
  offending line: 55396,3346357,56.7,1,3214.8900000000003,132.0,"Infestation (...) [duplicate 1]
2022-12-20 12:06:13 TaskSetManager: INFO: Starting task 0.2 in stage 110.0 (TID 1439, ip-10-60-116-137.eu-west-2.compute.internal, executor 1, partition 0, PROCESS_LOCAL, 7955 bytes)
2022-12-20 12:06:13 BlockManagerInfo: INFO: Added broadcast_488_piece0 in memory on ip-10-60-116-137.eu-west-2.compute.internal:44000 (size: 11.5 KB, free: 23.1 GB)
2022-12-20 12:06:13 BlockManagerInfo: INFO: Added broadcast_487_piece0 in memory on ip-10-60-116-137.eu-west-2.compute.internal:44000 (size: 49.0 B, free: 23.1 GB)
2022-12-20 12:06:14 TaskSetManager: INFO: Lost task 0.2 in stage 110.0 (TID 1439) on ip-10-60-116-137.eu-west-2.compute.internal, executor 1: is.hail.utils.HailException (Caught exception while reading file:/mnt/project/phecodes/132.0.csv: expected 7 fields, but found 8 fields
  offending line: 55396,3346357,56.7,1,3214.8900000000003,132.0,"Infestation (...) [duplicate 2]
2022-12-20 12:06:14 TaskSetManager: INFO: Starting task 0.3 in stage 110.0 (TID 1440, ip-10-60-153-27.eu-west-2.compute.internal, executor 8, partition 0, PROCESS_LOCAL, 7955 bytes)
2022-12-20 12:06:14 BlockManagerInfo: INFO: Added broadcast_488_piece0 in memory on ip-10-60-153-27.eu-west-2.compute.internal:44000 (size: 11.5 KB, free: 23.1 GB)
2022-12-20 12:06:14 BlockManagerInfo: INFO: Added broadcast_487_piece0 in memory on ip-10-60-153-27.eu-west-2.compute.internal:44000 (size: 49.0 B, free: 23.1 GB)
2022-12-20 12:06:14 TaskSetManager: INFO: Lost task 0.3 in stage 110.0 (TID 1440) on ip-10-60-153-27.eu-west-2.compute.internal, executor 8: is.hail.utils.HailException (Caught exception while reading file:/mnt/project/phecodes/132.0.csv: expected 7 fields, but found 8 fields
  offending line: 55396,3346357,56.7,1,3214.8900000000003,132.0,"Infestation (...) [duplicate 3]
2022-12-20 12:06:14 TaskSetManager: ERROR: Task 0 in stage 110.0 failed 4 times; aborting job
2022-12-20 12:06:14 TaskSchedulerImpl: INFO: Removed TaskSet 110.0, whose tasks have all completed, from pool 
2022-12-20 12:06:14 TaskSchedulerImpl: INFO: Cancelling stage 110
2022-12-20 12:06:14 TaskSchedulerImpl: INFO: Killing all running tasks in stage 110: Stage cancelled
2022-12-20 12:06:14 DAGScheduler: INFO: ResultStage 110 (collect at SparkBackend.scala:286) failed in 2.827 s due to Job aborted due to stage failure: Task 0 in stage 110.0 failed 4 times, most recent failure: Lost task 0.3 in stage 110.0 (TID 1440, ip-10-60-153-27.eu-west-2.compute.internal, executor 8): is.hail.utils.HailException: Caught exception while reading file:/mnt/project/phecodes/132.0.csv: expected 7 fields, but found 8 fields
  offending line: 55396,3346357,56.7,1,3214.8900000000003,132.0,"Infestation (...
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:24)
	at is.hail.utils.package$.fatal(package.scala:78)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:409)
	at 

... etc...

Caused by: is.hail.utils.HailException: expected 7 fields, but found 8 fields
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:11)
	at is.hail.utils.package$.fatal(package.scala:78)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:378)
	... 23 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2001)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1984)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1983)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1983)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1033)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1033)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1033)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2223)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2172)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2161)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at is.hail.backend.spark.SparkBackend.parallelizeAndComputeWithIndex(SparkBackend.scala:286)
	at is.hail.backend.BackendUtils.collectDArray(BackendUtils.scala:28)
	at __C65922Compiled.__m65923split_ToArray(Emit.scala)
	at __C65922Compiled.apply(Emit.scala)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$1.apply$mcJ$sp(CompileAndEvaluate.scala:68)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$1.apply(CompileAndEvaluate.scala:68)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$1.apply(CompileAndEvaluate.scala:68)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:68)
	at is.hail.expr.ir.CompileAndEvaluate$.evalToIR(CompileAndEvaluate.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.is$hail$expr$ir$LowerOrInterpretNonCompilable$$rewrite$1(LowerOrInterpretNonCompilable.scala:67)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.is$hail$expr$ir$LowerOrInterpretNonCompilable$$rewrite$1(LowerOrInterpretNonCompilable.scala:53)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:16)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:16)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:14)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$class.apply(LoweringPass.scala:14)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:64)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:15)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:13)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:13)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:47)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:381)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:417)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:414)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:46)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:46)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:275)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:414)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:413)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:413)
	at sun.reflect.GeneratedMethodAccessor50.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

is.hail.utils.HailException: Caught exception while reading file:/mnt/project/phecodes/132.0.csv: expected 7 fields, but found 8 fields
  offending line: 55396,3346357,56.7,1,3214.8900000000003,132.0,"Infestation (lice, mites)"
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:24)
	at is.hail.utils.package$.fatal(package.scala:78)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:409)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:368)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at __C65935collect_distributed_array.__m65946split_ToArray(Unknown Source)
	at __C65935collect_distributed_array.apply(Unknown Source)
	at __C65935collect_distributed_array.apply(Unknown Source)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1$$anonfun$apply$1.apply(BackendUtils.scala:31)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1$$anonfun$apply$1.apply(BackendUtils.scala:30)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool.scopedRegion(RegionPool.scala:144)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1.apply(BackendUtils.scala:30)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1.apply(BackendUtils.scala:28)
	at is.hail.backend.spark.SparkBackendComputeRDD.compute(SparkBackend.scala:730)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

is.hail.utils.HailException: expected 7 fields, but found 8 fields
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:11)
	at is.hail.utils.package$.fatal(package.scala:78)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:378)
	at is.hail.expr.ir.TextTableReader$$anonfun$23$$anonfun$apply$6$$anonfun$apply$7.apply(TextTableReader.scala:368)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at __C65935collect_distributed_array.__m65946split_ToArray(Unknown Source)
	at __C65935collect_distributed_array.apply(Unknown Source)
	at __C65935collect_distributed_array.apply(Unknown Source)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1$$anonfun$apply$1.apply(BackendUtils.scala:31)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1$$anonfun$apply$1.apply(BackendUtils.scala:30)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool.scopedRegion(RegionPool.scala:144)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1.apply(BackendUtils.scala:30)
	at is.hail.backend.BackendUtils$$anonfun$collectDArray$1.apply(BackendUtils.scala:28)
	at is.hail.backend.spark.SparkBackendComputeRDD.compute(SparkBackend.scala:730)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Using from_pandas()

Similar to above, I tried the following code to first import the csv into a pandas dataframe, and then convert those to Hail tables:

dtypes = {"": int, "sample_id": str, "age": float, "has_phenotype": float, "age_squared": float, "phecode": float, "phenotype": str} 
csv_file_to_df = {phecode_csv_file : pd.read_csv(os.path.join(phecodes_directory,phecode_csv_file), quotechar = '"', dtype = dtypes) for phecode_csv_file in phecode_csv_files}
phecode_tables = {phecode_csv_file : hl.Table.from_pandas(csv_file_to_df[phecode_csv_file]).key_by('sample_id') for phecode_csv_file in phecode_csv_files}

I presume that because this doesn’t throw an error, the csv are imported into dataframes properly. Then, annotating and materializing the matrixtable as above, I get the following StackOverflowError. This happens regardless of how many Hail tables I try to annotate with (1 table will throw the error too).

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-118-8d7d130396be> in <module>
----> 1 mt.checkpoint(f'dnax://{my_database}/CASR/phewas/jeremy_mt_1_with_phenos.mt/' class="ansi-blue-fg">, overwrite=True)

<decorator-gen-1273> in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals, _drop_cols, _drop_rows)

/opt/conda/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.6/site-packages/hail/matrixtable.py in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals, _drop_cols, _drop_rows)
   2500 }"""
   2501         if not _read_if_exists or not hl.hadoop_exists(f'{output}/_SUCCESS'):
-> 2502             self.write(output=output, overwrite=overwrite, stage_locally=stage_locally, _codec_spec=_codec_spec)
   2503         return hl.read_matrix_table(output, _intervals=_intervals, _filter_intervals=_filter_intervals,
   2504                                     _drop_cols=_drop_cols, _drop_rows=_drop_rows)

<decorator-gen-1275> in write(self, output, overwrite, stage_locally, _codec_spec, _partitions, _checkpoint_file)

/opt/conda/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.6/site-packages/hail/matrixtable.py in write(self, output, overwrite, stage_locally, _codec_spec, _partitions, _checkpoint_file)
   2542 
   2543         writer = ir.MatrixNativeWriter(output, overwrite, stage_locally, _codec_spec, _partitions, _partitions_type, _checkpoint_file)
-> 2544         Env.backend().execute(ir.MatrixWrite(self._mir, writer))
   2545 
   2546     class _Show:

/opt/conda/lib/python3.6/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
    108                 raise HailUserError(message_and_trace) from None
    109 
--> 110             raise e

/opt/conda/lib/python3.6/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     84         # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     85         try:
---> 86             result_tuple = self._jhc.backend().executeEncode(jir, stream_codec)
     87             (result, timings) = (result_tuple._1(), result_tuple._2())
     88             value = ir.typ._from_encoding(result)

/cluster/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/conda/lib/python3.6/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     29                 raise FatalError('%s\n\nJava stack trace:\n%s\n'
     30                                  'Hail version: %s\n'
---> 31                                  'Error summary: %s' % (deepest, full, hail.__version__, deepest), error_id) from None
     32         except pyspark.sql.utils.CapturedException as e:
     33             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: StackOverflowError: null

Java stack trace:
is.hail.utils.HailException: error after applying LiftRelationalValuesToRelationalLets
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:15)
	at is.hail.utils.package$.fatal(package.scala:78)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:25)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:13)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:13)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:47)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:381)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:417)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:414)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:46)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:46)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:275)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:414)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:413)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:413)
	at sun.reflect.GeneratedMethodAccessor50.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

is.hail.utils.HailException: Error while typechecking IR:
(RelationalLet __iruid_297570
  (TableCollect
    (TableRead
      Table{global:Struct{},key:[],row:Struct{sample_id:String}}
      False
      {\"files\":[\"file:////mnt/project/phecodes/10.0.csv\"],\"typeMapStr\":{\"\":\"Int32\",\"sample_id\":\"String\",\"age\":\"Float64\",\"age_squared\":\"Float64\",\"phecode\":\"Float64\",\"has_phenotype\":\"Float64\",\"phenotype\":\"String\"},\"comment\":[],\"separator\":\",\",\"missing\":[\"NA\"],\"hasHeader\":true,\"quoteStr\":null,\"skipBlankLines\":false,\"forceBGZ\":false,\"filterAndReplace\":{},\"forceGZ\":false,\"name\":\"TextTableReader\"}))
  (RelationalLet __iruid_297571
    (TableCollect
      (TableRead
        Table{global:Struct{},key:[],row:Struct{sample_id:String}}
        False
        {\"files\":[\"file:////mnt/project/phecodes/110.1.csv\"],\"typeMapStr\":{\"\":\"Int32\",\"sample_id\":\"String\",\"age\":\"Float64\",\"age_squared\":\"Float64\",\"phecode\":\"Float64\",\"has_phenotype\":\"Float64\",\"phenotype\":\"String\"},\"comment\":[],\"separator\":\",\",\"missing\":[\"NA\"],\"hasHeader\":true,\"quoteStr\":null,\"skipBlankLines\":false,\"forceBGZ\":false,\"filterAndReplace\":{},\"forceGZ\":false,\"name\":\"TextTableReader\"}))
    (RelationalLet __iruid_297572
      (TableCollect
        (TableRead
          Table{global:Struct{},key:[],row:Struct{sample_id:String}}
          False
      

... etc...



Hail version: 0.2.78-b17627756568
Error summary: StackOverflowError: null

Best,
Jeremy

Hmm. I copied your snippet into a file called /tmp/foo.csv. Indeed, without quote='"', I get an error. With that parameter it works:

ht = hl.import_table('/tmp/foo.csv', delimiter=',', quote='"')
ht.show()

Can you confirm that quote='"' fails on the 11 lines you shared?

Aside: do all these files have the exact same columns? Hail can import these files massively in parallel:

ht = hl.import_table('/path/to/*.csv', delimiter=',', quote='"')

The latter issue is definitely a bug, but I think it’s been fixed since 0.2.78 (which is about a year old). The following works for me:

In [18]: import hail as hl
    ...: import pandas as pd
    ...: import os
    ...: 
    ...: dtypes = {"": int, "sample_id": str, "age": float, "has_phenotype": float, "age_squared": float, "phecode": float, "phenotype": str}
    ...: csv_file_to_df = {phecode_csv_file : pd.read_csv(phecode_csv_file, quotechar = '"', dtype = dtypes) for phecode_csv_file in ('/tmp/foo.csv',)}
    ...: phecode_tables = {phecode_csv_file : hl.Table.from_pandas(csv_file_to_df[phecode_csv_file]).key_by('sample_id') for phecode_csv_file in ('/tmp/foo.csv',)}

In [19]: list(phecode_tables.values())[0].show()
+------------+-----------+----------+---------------+-------------+----------+-----------------------------+
| Unnamed: 0 | sample_id |      age | has_phenotype | age_squared |  phecode | phenotype                   |
+------------+-----------+----------+---------------+-------------+----------+-----------------------------+
|      int32 | str       |  float64 |       float64 |     float64 |  float64 | str                         |
+------------+-----------+----------+---------------+-------------+----------+-----------------------------+
|     473239 | "1358291" | 5.90e+01 |      1.00e+00 |    3.48e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     495712 | "1968956" | 5.44e+01 |      1.00e+00 |    2.96e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     308211 | "2246007" | 5.90e+01 |      1.00e+00 |    3.48e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     486620 | "2590153" | 7.67e+01 |      1.00e+00 |    5.88e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     304542 | "2953691" | 5.93e+01 |      1.00e+00 |    3.52e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|      55396 | "3346357" | 5.67e+01 |      1.00e+00 |    3.21e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     213665 | "3554529" | 6.02e+01 |      1.00e+00 |    3.62e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     159240 | "3860334" | 7.09e+01 |      1.00e+00 |    5.03e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     425964 | "5014116" | 5.92e+01 |      1.00e+00 |    3.50e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     359018 | "5288207" | 7.07e+01 |      1.00e+00 |    5.00e+03 | 1.32e+02 | "Infestation (lice, mites)" |
|     318349 | "5327743" | 7.35e+01 |      1.00e+00 |    5.40e+03 | 1.32e+02 | "Infestation (lice, mites)" |
+------------+-----------+----------+---------------+-------------+----------+-----------------------------+

Thank you @danking ! Everything is working today (!). I don’t know why, but the error using hl.import_table() is no longer reproducing when I quote='"'. I could have sworn it was yesterday… FWIW I am using a new cluster now, which sometimes fixes things.

I was able to import all the files very quickly with the glob pattern you suggested, too!

For pandas import, it actually works now too!

Table output:
image

Annotated matrixtable output:
image

And a checkpoint() command works too.

No idea what the reason is… new cluster, perhaps.

One additional question, though, is I’m not sure how to annotate the matrixtable after importing all the csv files into a single ht with hl.import_table(). I wanted to say ht.group_by(), but I don’t think I want to use an aggregator. I think a series of `ht.filter()’ commands could work, but this seems like it would require ~1500 passes (for ~1500 phenotypes) through the file. Any other suggestions?

Best,
Jeremy

I presume you want to bring a vector / structure of phenotypes onto your MT?

I’d probably do this:

phecodes = ht.aggregate(hl.agg.collect_as_set(ht.phecode))
ht = ht.group_by(
    ht.sample_id
).aggregate(
    phenos = hl.dict(hl.agg.collect((ht.phecode, ht.row)))
)
ht = ht.annotate(**{
    phecode: ht.phenos.get(phecode)
    for phecode in phecodes
})
mt = mt.annotate_cols(**ht[mt.col_key])

That’ll get you a column field for each phecode. Then you can use them in regressions as you please.

EDIT: Fixed the aggregate to apply hl.dict

Thank you @danking ! Yes, I think you suggest code that gets what I want done. But to be sure, what I’d like is, e.g. for phecode 132_0, the statement mt.132_0 should be a column field that contains a struct with the field has_phenotype (and preferably also age, age_squared, and phenotype).

I think I’m having trouble with two parts of the code you provided, though. First, the group_by() statement:

ht = ht.group_by(
     ht.sample_id
).aggregate(
    phenos = hl.agg.collect((ht.phecode, ht.row)) # phenos becomes an array of tuples of (ht.phecode, ht.row)
#     phenos = hl.agg.collect(hl.struct(phecode=ht.phecode, info=ht.row))   #phenos becomes an array of structs with fields phecode and info
#     phenos = hl.struct(phecode=hl.agg.collect(ht.phecode), 
#                        info=hl.agg.collect(ht.row))    # phenos becomes hierarchical fields pictured below
#     phenos = hl.agg.collect(hl.struct(ht.phecode=ht.row)) # doesn't run because keyword can't be expression
)

And I also am having trouble understanding the **ht[mt.col_key] part works—maybe that solves the first problem…?

Best,
Jeremy

Yeah phenos becomes an array of structures. You can run ht.phenos.describe() to take a closer look.

This group-by-aggregate-collect is kind of Hail’s way of doing “pivot” from Pandas. The sample id becomes the table’s key and then phenos is an array of all the records with that sample id.

This line properly finishes the pivot by creating a column for each phecode. The ** syntax lets you programmatically provide keyword arguments. We could just as well have written this:

ht = ht.annotate(**{
    phecode: ht.phenos.get(phecode)
    for phecode in phecodes
})

as:

ht = ht.annotate(
    phe_0123 = ht.phenos.get('phe_0123'),
    phe_5678 = ht.phenos.get('phe_5678'),
    ...
)

Two things are happening here. First ht[mt.col_key] is a “join expression”. It contains the matching record in ht for mt.col_key. The ** part takes all the fields from that record and adds them as column fields in the MT.

mt = mt.annotate_cols(**ht[mt.col_key])

Again, we could have written this out laboriously by listing every phecode, but that’s annoying.

Hmm… I might be missing something, but the code

phenos = hl.agg.collect((ht.phecode, ht.row))

creates an array of tuples for me. When I try to make them structs, using

phenos = hl.agg.collect(hl.struct((ht.phecode_str, ht.row))

they look like this:

'phenos': array<struct {
        phecode_str: tuple (
            str, 
            struct {
                ``: int32, 
                sample_id: str, 
                age: float64, 
                has_phenotype: float64, 
                age_squared: float64, 
                phecode: float64, 
                phenotype: str, 
                phecode_str: str
            }
        )

where I can’t use dot notation to get each individual phecode. And when I try to explicitly make a named expression a variable in the struct as follows:

phenos = hl.agg.collect(hl.struct(ht.phecode_str=ht.row)

I get a SyntaxError: keyword can't be an expression.

Am I missing something here?

Best,
Jeremy

1 Like

Ahhhh, sorry I missed an hl.dict around the collect:

ht = ht.group_by(
    ht.sample_id
).aggregate(
    phenos = hl.dict(hl.agg.collect((ht.phecode, ht.row)))
)

That’ll cover the collected array of tuples into a dictionary.

It works!! : ) Saved me from potentially pursuing a 10-hour method to do the import using Pandas… thank you!!

1 Like