Accessing Blockmatrix data in parallel

Hi, I have a custom python generator that’s pulling data out of a hail blockmatrix to use for deep learning, as previously discussed here.

This has been working pretty well by itself, but when I try to parallelise the data generator (by running multiple workers), I get the following error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-14-4cdb9d3fb71c> in <module>
----> 1 for X,Y in train_loader:
      2     print(X)

~/anaconda3/envs/hailpyt/lib/python3.6/site-packages/torch/utils/data/dataloader.py in __next__(self)
    817             else:
    818                 del self._task_info[idx]
--> 819                 return self._process_data(data)
    820 
    821     next = __next__  # Python 2 compatibility

~/anaconda3/envs/hailpyt/lib/python3.6/site-packages/torch/utils/data/dataloader.py in _process_data(self, data)
    844         self._try_put_index()
    845         if isinstance(data, ExceptionWrapper):
--> 846             data.reraise()
    847         return data
    848 

~/anaconda3/envs/hailpyt/lib/python3.6/site-packages/torch/_utils.py in reraise(self)
    383             # (https://bugs.python.org/issue2651), so we work around it.
    384             msg = KeyErrorMessage(msg)
--> 385         raise self.exc_type(msg)

TypeError: Caught TypeError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/home/ch283/anaconda3/envs/hailpyt/lib/python3.6/site-packages/torch/utils/data/_utils/worker.py", line 178, in _worker_loop
    data = fetcher.fetch(index)
  File "/home/ch283/anaconda3/envs/hailpyt/lib/python3.6/site-packages/torch/utils/data/_utils/fetch.py", line 34, in fetch
    data = next(self.dataset_iter)
  File "/data/ch283/MyIterableDataset.py", line 62, in __iter__
    yield self.__get_data(batch)
  File "/data/ch283/MyIterableDataset.py", line 36, in __get_data
    X = self.bm.filter_cols(batch).to_numpy()
  File "/data/ch283/modelfit_norm.py", line 36, in to_numpy
    bm.tofile(uri)
  File "<decorator-gen-1467>", line 2, in tofile
  File "/home/ch283/anaconda3/envs/hailpyt/lib/python3.6/site-packages/hail/typecheck/check.py", line 614, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/home/ch283/anaconda3/envs/hailpyt/lib/python3.6/site-packages/hail/linalg/blockmatrix.py", line 1170, in tofile
    Env.backend().execute(BlockMatrixWrite(self._bmir, writer))
  File "/home/ch283/anaconda3/envs/hailpyt/lib/python3.6/site-packages/hail/backend/spark_backend.py", line 296, in execute
    result = json.loads(self._jhc.backend().executeJSON(jir))
  File "/home/ch283/anaconda3/envs/hailpyt/lib/python3.6/json/__init__.py", line 348, in loads
    'not {!r}'.format(s.__class__.__name__))
TypeError: the JSON object must be str, bytes or bytearray, not 'JavaMap'

Wondering if anyone can shed some light on this. Is it due to multiple workers trying to interact with the blockmatrix at once?

This is a Hail bug, I’ll put a fix in ASAP and respond here.

Can you share what version of Hail you are using? hl.version() is sufficient

Thanks!

hl.version() gives me 0.2.50-32fc1de02d32. I may be due for an update.

Can you try the latest version of Hail, 0.2.63? I’m having trouble reproducing this error on that version.

Yes, I just upgraded and tried it - same error.

I’m having trouble replicating this. This works for me, does it work for you?

import hail as hl

mt = hl.balding_nichols_model(1, 2, 2)
bm = hl.linalg.BlockMatrix.from_entry_expr(mt.GT.n_alt_alleles())
bm = bm.filter_cols([0])
bm.tofile('/tmp/foo')

Can you create a small example that also fails?

That works for me too. I’m trying to come up with a small example but it’s a bit challenging as I don’t get that error until parallelisation is introduced - I can get a numpy array from a blockmatrix no problem with only one worker. I’ll see if I can come up with something.

Ahh, yes, Hail relies on Py4J currently which is not thread-safe. I actually have a fix in progress for this. Let me get back to you.

1 Like

Are the worker “processes” actual Python processes or are they threads?

I believe they’re Python processes. I’ve subclassed PyTorch’s IterableDataset class.

Hail’s backend isn’t thread-safe right now. We’ll fix that by synchronizing in calls to the evaluation method, but this will probably mean that you don’t benefit much from the parallelism of pytorch.

Hi @ch283 ,

because you are working with Pytorch, maybe this Feature Request could be of interest.

Regards,
Andrés.

I am running hail locally on a big machine with 94 cores. I am planning to run some hail code inside a ThreadPoolExecutor essentially each thread working on independent .mt files (reading non-block gzip files and converting each one to a .mt file) . Would that work?

How many matrixtables do you want to generate? I don’t think this is going to work with a ThreadPoolExecutor in Python, but Hail does have some utilities for co-scheduling lots of writes at the same time.

It’s about 100 .gz files that I need to read and convert to .mt files. Is there a better way to do it?

It seems that ThreadPoolExecutor speeds it up quite a bit. And I can see multiple jobs running in parallel in the spark dashboard. Without it, there is only one job at a time running

Would you be able to share some of these utilities for scheduling multiple writes together?

Through an oversight, it appears not to be documented, but you can use:

hl.experimental.write_matrix_tables(mts, path_prefix)
1 Like

If TPE is working for you, though, no need to break it!

TPE works like 90% of the time. Sometimes it crashes randomly. Maybe something that hail is using is not thead-safe. It usually crashes with some error saying it could no unify some type X to some type Y. I hope that the times it works, it does not somehow produce readable but incorrect results