Commit f8431d79 authored by Price, Zach's avatar Price, Zach
Browse files

Clean up after globus failures

parent abd338a0
Loading
Loading
Loading
Loading
Loading
+5 −2
Original line number Diff line number Diff line
@@ -182,7 +182,7 @@ class GUCFileSystem(AsyncFileSystem):

    protocol = 'guc'

    async def globus_url_copy(self, *args):
    async def globus_url_copy(self, source, destination, *args):
        cmd_args = [
            '-create-dest',
            '-concurrency', '5',
@@ -195,6 +195,8 @@ class GUCFileSystem(AsyncFileSystem):
        if config.debug:
            cmd_args.append('-dbg')
        cmd_args += args
        cmd_args += source
        cmd_args += destination.as_posix()
        log.debug('Executing %s %s', self.guc_bin, cmd_args)

        p = await create_subprocess_exec(self.guc_bin, *cmd_args, stdout=PIPE, stderr=STDOUT)
@@ -208,6 +210,7 @@ class GUCFileSystem(AsyncFileSystem):

        rc = p.returncode
        if not rc == 0:
            destination.unlink()
            raise HPSSError(f'globus_url_copy returned {rc}')

        return rc, stdout, stderr
@@ -237,7 +240,7 @@ class GUCFileSystem(AsyncFileSystem):
        rpath = f'{self.guc_endpoint}{self._strip_protocol(rpath)}'
        lpath = PosixPath(lpath)
        os.makedirs(lpath.parent, exist_ok=True)
        rc, stdout, stderr = await self.globus_url_copy(rpath, lpath.as_posix())
        rc, stdout, stderr = await self.globus_url_copy(rpath, lpath)
        if stdout:
            log.info(stdout)
        if stderr:
+12 −5
Original line number Diff line number Diff line
@@ -10,7 +10,7 @@ from pprint import pprint
import pandas as pd
import requests
import xarray as xr
from tqdm import tqdm
from tqdm.auto import tqdm

from . import config
from logging import getLogger
@@ -43,10 +43,17 @@ def download_file(file, dest, token):

    with requests.get(file['url'], stream=True, auth=BearerAuth(token)) as r:
        r.raise_for_status()
        with open(final_path, mode='wb') as f:
            with tqdm(desc=final_path.as_posix(), total=file['size'], unit='iB', unit_scale=True, unit_divisor=1024) as bar:
        tqdm_args = {
            'desc': final_path.as_posix(),
            'total': file['size'],
            'leave': False,
            'unit': 'iB',
            'unit_scale': True,
            'unit_divisor': 1024,
        }
        with tqdm.wrapattr(open(final_path, mode='wb'), 'write', *tqdm_args) as f:
            for chunk in r.iter_content(chunk_size=config.download_chunk_size):
                    bar.update(f.write(chunk))
                f.write(chunk)


class Archive(object):