RemoteDisconnect when joining matrix tables

Hi. We are trying to extract data from a couple million variants (listed in a 54.7-MiB-text file) at distinct regions of the genome from a large matrix table. We have tried using a filter_intervals step, but were limited to a string of about 1000 SNPs, and so broke the extraction up into a loop.
We tried to join the matrix tables from each step using union_rows, but repeatedly ran into a RemoteDisconnect error involving the urllib3 connectionpool library. We are running Hail on the Cloud through the All of Us Researcher Workbench. The environment consisted of 2 workers with 16 CPUs and 104 GB RAM. Here is the code for the loop.

import math
print(f’starting step 1’ )
start = datetime.now() # record start time

step_size = 1000
step = 1
a = (step - 1) * step_size
b = min(step_size * step - 1,len(test_intervals) - 1)
#this is for text file
mt_temp = hl.filter_intervals(mt,[hl.parse_locus_interval(x,) for x in test_intervals[a:b]])
mt_eur = [hl.variant_qc(mt_temp)]

stop = datetime.now() # record end time
total_time = str(stop - start)
print(total_time)

for step in range(2,math.ceil(len(test_intervals) / step_size) + 1):
#for step in range(2,25):
print(f’starting step {step}’ )
start = datetime.now() # record start time
a = (step - 1) * step_size
b = min(step_size * step - 1,len(test_intervals) - 1)
mt_temp = hl.filter_intervals(mt,[hl.parse_locus_interval(x,) for x in test_intervals[a:b]])
mt_eur = mt_eur + [hl.variant_qc(mt_temp)]
stop = datetime.now() # record end time
total_time = str(stop - start)
print(total_time)

The loop appears to work and does not produce any errors, but then there is a error when the filtered data is being combined into one matrix table. The data is being combined through these steps:

start = datetime.now() # record start time
mt_eur1 = hl.MatrixTable.union_rows(*mt_eur) # union of all mt’s in list
stop = datetime.now()
total_time = str(stop - start)
print(f’Joined MatrixTabels in {total_time}')

An error returns stating the there was:

RemoteDisconnected Traceback (most recent call last)
File /opt/conda/lib/python3.10/site-packages/urllib3/connectionpool.py:703, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
702 # Make the request on the httplib connection object.
→ 703 httplib_response = self._make_request(
704 conn,
705 method,
706 url,
707 timeout=timeout_obj,
708 body=body,
709 headers=headers,
710 chunked=chunked,
711 )
713 # If we’re going to release the connection in finally:, then
714 # the response doesn’t need to know about the connection. Otherwise
715 # it will also try to release it and we’ll have a double-release
716 # mess.

File /opt/conda/lib/python3.10/site-packages/urllib3/connectionpool.py:449, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
445 except BaseException as e:
446 # Remove the TypeError from the exception chain in
447 # Python 3 (including for exceptions like SystemExit).
448 # Otherwise it looks like a bug in the code.
→ 449 six.raise_from(e, None)
450 except (SocketTimeout, BaseSSLError, SocketError) as e:

File :3, in raise_from(value, from_value)

File /opt/conda/lib/python3.10/site-packages/urllib3/connectionpool.py:444, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
443 try:
→ 444 httplib_response = conn.getresponse()
445 except BaseException as e:
446 # Remove the TypeError from the exception chain in
447 # Python 3 (including for exceptions like SystemExit).
448 # Otherwise it looks like a bug in the code.

File /opt/conda/lib/python3.10/http/client.py:1375, in HTTPConnection.getresponse(self)
1374 try:
→ 1375 response.begin()
1376 except ConnectionError:

File /opt/conda/lib/python3.10/http/client.py:318, in HTTPResponse.begin(self)
317 while True:
→ 318 version, status, reason = self._read_status()
319 if status != CONTINUE:

File /opt/conda/lib/python3.10/http/client.py:287, in HTTPResponse._read_status(self)
284 if not line:
285 # Presumably, the server closed the connection before
286 # sending a valid response.
→ 287 raise RemoteDisconnected(“Remote end closed connection without”
288 " response")
289 try:

RemoteDisconnected: Remote end closed connection without response

During handling of the above exception, another exception occurred:

ProtocolError Traceback (most recent call last)
File /opt/conda/lib/python3.10/site-packages/requests/adapters.py:667, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
666 try:
→ 667 resp = conn.urlopen(
668 method=request.method,
669 url=url,
670 body=request.body,
671 headers=request.headers,
672 redirect=False,
673 assert_same_host=False,
674 preload_content=False,
675 decode_content=False,
676 retries=self.max_retries,
677 timeout=timeout,
678 chunked=chunked,
679 )
681 except (ProtocolError, OSError) as err:

File /opt/conda/lib/python3.10/site-packages/urllib3/connectionpool.py:787, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
785 e = ProtocolError(“Connection aborted.”, e)
→ 787 retries = retries.increment(
788 method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
789 )
790 retries.sleep()

File /opt/conda/lib/python3.10/site-packages/urllib3/util/retry.py:550, in Retry.increment(self, method, url, response, error, _pool, _stacktrace)
549 if read is False or not self._is_method_retryable(method):
→ 550 raise six.reraise(type(error), error, _stacktrace)
551 elif read is not None:

File /opt/conda/lib/python3.10/site-packages/urllib3/packages/six.py:769, in reraise(tp, value, tb)
768 if value.traceback is not tb:
→ 769 raise value.with_traceback(tb)
770 raise value

File /opt/conda/lib/python3.10/site-packages/urllib3/connectionpool.py:703, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)
702 # Make the request on the httplib connection object.
→ 703 httplib_response = self._make_request(
704 conn,
705 method,
706 url,
707 timeout=timeout_obj,
708 body=body,
709 headers=headers,
710 chunked=chunked,
711 )
713 # If we’re going to release the connection in finally:, then
714 # the response doesn’t need to know about the connection. Otherwise
715 # it will also try to release it and we’ll have a double-release
716 # mess.

File /opt/conda/lib/python3.10/site-packages/urllib3/connectionpool.py:449, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
445 except BaseException as e:
446 # Remove the TypeError from the exception chain in
447 # Python 3 (including for exceptions like SystemExit).
448 # Otherwise it looks like a bug in the code.
→ 449 six.raise_from(e, None)
450 except (SocketTimeout, BaseSSLError, SocketError) as e:

File :3, in raise_from(value, from_value)

File /opt/conda/lib/python3.10/site-packages/urllib3/connectionpool.py:444, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw)
443 try:
→ 444 httplib_response = conn.getresponse()
445 except BaseException as e:
446 # Remove the TypeError from the exception chain in
447 # Python 3 (including for exceptions like SystemExit).
448 # Otherwise it looks like a bug in the code.

File /opt/conda/lib/python3.10/http/client.py:1375, in HTTPConnection.getresponse(self)
1374 try:
→ 1375 response.begin()
1376 except ConnectionError:

File /opt/conda/lib/python3.10/http/client.py:318, in HTTPResponse.begin(self)
317 while True:
→ 318 version, status, reason = self._read_status()
319 if status != CONTINUE:

File /opt/conda/lib/python3.10/http/client.py:287, in HTTPResponse._read_status(self)
284 if not line:
285 # Presumably, the server closed the connection before
286 # sending a valid response.
→ 287 raise RemoteDisconnected(“Remote end closed connection without”
288 " response")
289 try:

ProtocolError: (‘Connection aborted.’, RemoteDisconnected(‘Remote end closed connection without response’))

During handling of the above exception, another exception occurred:

ConnectionError Traceback (most recent call last)
Cell In[21], line 2
1 start = datetime.now() # record start time
----> 2 mt_eur1 = hl.MatrixTable.union_rows(*mt_eur) # union of all mt’s in list
3 # mt_eur1 = hl.MatrixTable.union_rows(*mt_eur[0:20]) # union of all mt’s in list
4 stop = datetime.now()

File :2, in union_rows(_check_cols, *datasets)

File /opt/conda/lib/python3.10/site-packages/hail/typecheck/check.py:585, in _make_dec..wrapper(__original_func, *args, **kwargs)
582 @decorator
583 def wrapper(original_func: Callable[…, T], *args, **kwargs) → T:
584 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
→ 585 return original_func(*args, **kwargs)

File /opt/conda/lib/python3.10/site-packages/hail/matrixtable.py:3958, in MatrixTable.union_rows(_check_cols, *datasets)
3954 raise ValueError(
3955 error_msg.format(“col key types”, 0, first.col_key.dtype, i + 1, next.col_key.dtype)
3956 )
3957 if _check_cols:
→ 3958 wrong_keys = hl.eval(
3959 hl.rbind(
3960 first.col_key.collect(_localize=False),
3961 lambda first_keys: (
3962 hl.enumerate([mt.col_key.collect(_localize=False) for mt in datasets[1:]]).find(
3963 lambda x: ~(x[1] == first_keys)
3964 )[0]
3965 ),
3966 )
3967 )
3968 if wrong_keys is not None:
3969 raise ValueError(
3970 f"‘MatrixTable.union_rows’ expects all datasets to have the same columns. "
3971 f"Datasets 0 and {wrong_keys + 1} have different columns (or possibly different order)."
3972 )

File :2, in eval(expression)

File /opt/conda/lib/python3.10/site-packages/hail/typecheck/check.py:585, in _make_dec..wrapper(__original_func, *args, **kwargs)
582 @decorator
583 def wrapper(original_func: Callable[…, T], *args, **kwargs) → T:
584 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
→ 585 return original_func(*args, **kwargs)

File /opt/conda/lib/python3.10/site-packages/hail/expr/expressions/expression_utils.py:194, in eval(expression)
167 @typecheck(expression=expr_any)
168 def eval(expression):
169 “”“Evaluate a Hail expression, returning the result.
170
171 This method is extremely useful for learning about Hail expressions and
(…)
192 Any
193 “””
→ 194 return eval_timed(expression)[0]

File :2, in eval_timed(expression)

File /opt/conda/lib/python3.10/site-packages/hail/typecheck/check.py:585, in _make_dec..wrapper(__original_func, *args, **kwargs)
582 @decorator
583 def wrapper(original_func: Callable[…, T], *args, **kwargs) → T:
584 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
→ 585 return original_func(*args, **kwargs)

File /opt/conda/lib/python3.10/site-packages/hail/expr/expressions/expression_utils.py:164, in eval_timed(expression)
161 uid = Env.get_uid()
162 ir = expression._indices.source.select_globals(**{uid: expression}).index_globals()[uid]._ir
→ 164 return Env.backend().execute(MakeTuple([ir]), timed=True)[0]

File /opt/conda/lib/python3.10/site-packages/hail/backend/spark_backend.py:226, in SparkBackend.execute(self, ir, timed)
223 except Exception as fatal:
224 raise err from fatal
→ 226 raise err

File /opt/conda/lib/python3.10/site-packages/hail/backend/spark_backend.py:218, in SparkBackend.execute(self, ir, timed)
216 def execute(self, ir: BaseIR, timed: bool = False) → Any:
217 try:
→ 218 return super().execute(ir, timed)
219 except Exception as err:
220 if self._copy_log_on_error:

File /opt/conda/lib/python3.10/site-packages/hail/backend/backend.py:188, in Backend.execute(self, ir, timed)
186 payload = ExecutePayload(self._render_ir(ir), ‘{“name”:“StreamBufferSpec”}’, timed)
187 try:
→ 188 result, timings = self._rpc(ActionTag.EXECUTE, payload)
189 except FatalError as e:
190 raise e.maybe_user_error(ir) from None

File /opt/conda/lib/python3.10/site-packages/hail/backend/py4j_backend.py:218, in Py4JBackend._rpc(self, action, payload)
216 path = action_routes[action]
217 port = self._backend_server_port
→ 218 resp = self._requests_session.post(f’http://localhost:{port}{path}', data=data)
219 if resp.status_code >= 400:
220 error_json = orjson.loads(resp.content)

File /opt/conda/lib/python3.10/site-packages/requests/sessions.py:637, in Session.post(self, url, data, json, **kwargs)
626 def post(self, url, data=None, json=None, **kwargs):
627 r""“Sends a POST request. Returns :class:Response object.
628
629 :param url: URL for the new :class:Request object.
(…)
634 :rtype: requests.Response
635 “””
→ 637 return self.request(“POST”, url, data=data, json=json, **kwargs)

File /opt/conda/lib/python3.10/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
584 send_kwargs = {
585 “timeout”: timeout,
586 “allow_redirects”: allow_redirects,
587 }
588 send_kwargs.update(settings)
→ 589 resp = self.send(prep, **send_kwargs)
591 return resp

File /opt/conda/lib/python3.10/site-packages/requests/sessions.py:703, in Session.send(self, request, **kwargs)
700 start = preferred_clock()
702 # Send the request
→ 703 r = adapter.send(request, **kwargs)
705 # Total elapsed time of the request (approximately)
706 elapsed = preferred_clock() - start

File /opt/conda/lib/python3.10/site-packages/requests/adapters.py:682, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
667 resp = conn.urlopen(
668 method=request.method,
669 url=url,
(…)
678 chunked=chunked,
679 )
681 except (ProtocolError, OSError) as err:
→ 682 raise ConnectionError(err, request=request)
684 except MaxRetryError as e:
685 if isinstance(e.reason, ConnectTimeoutError):
686 # TODO: Remove this in 3.0.0: see #2811

ConnectionError: (‘Connection aborted.’, RemoteDisconnected(‘Remote end closed connection without response’))

Are there any methods to get around this error message? The goal is to get an efficient method to resolve this problem.