Loading pysen/inout/__init__.py +1 −0 Original line number Diff line number Diff line Loading @@ -7,5 +7,6 @@ from .reader import ( read_echo, read_magnetic, read_xyz, # NOQA read_datreat , read_transmission ) # NOQA from .writer import EchoWriter, make_echofilename # NOQA from .scans import EchoScan, XYZScan # NOQA from .utils import get_nsefiletype # NOQA # from .legacy_writer import write_csv, generate_echo # NOQA # from .legacy_history import extract_data, list_variables, parse_date # NOQA pysen/inout/hdf.py +47 −15 Original line number Diff line number Diff line Loading @@ -10,7 +10,8 @@ import numpy as np from ..revision import version as version_info from ..iostrings import encode from .reader import read_echo, read_magnetic, read_xyz from .writer import EchoWriter, make_echofilename from .utils import get_nsefiletype #FIXME: we need to work on the idea of a schema (or not???) SCHEMA_DEFAULT = { Loading Loading @@ -175,17 +176,43 @@ class HdfConverter: return subgroup, subkey def convert_to_hdf(filename, outdir, data_type='echo', **kwargs): """convert .echo, xyz analysis .xyz or (old) magnetic .dat file into an HDF file def read_nexus_echo(filename, outdir='.'): "read nexus echo file" echo = EchoWriter(make_echofilename(filename)) if not echo.read_nexus(filename): raise RuntimeError("Error reading NeXus file {filename}") echo.to_echo(outdir) return read_echo(echo.echofile) def _make_outfilename(filename, filetype=None, outdir='.'): """ """ outfile, _ = os.path.splitext(filename) if 'nexus' in filetype: outfile, _ = os.path.splitext(outfile) outfile = os.path.join(outdir, outfile + ".h5") #print(f"\noutfile={outfile}") return outfile def convert_to_hdf(filename, outdir, **kwargs): """convert .echo, xyz analysis .xyz or (old) magnetic .dat file into an HDF5 file @note: will not overwrite the .h5 if it exists and it is newer than the .echo file """ data_reader = { 'echo' : read_echo, 'xyz' : read_xyz, 'magnetic' : read_magnetic} 'magnetic' : read_magnetic, 'nexus_echo' : read_nexus_echo, # 'application/nse-echo' : read_echo, 'application/nse-xyz' : read_xyz, 'application/nexus-nse-echo' : read_nexus_echo, } data_type = kwargs.pop('data_type', None) mode = kwargs.pop('mode', 'w') schema = kwargs.pop('schema', None) compression = kwargs.pop('compression', None) Loading @@ -193,9 +220,17 @@ def convert_to_hdf(filename, outdir, data_type='echo', **kwargs): _log = logging.getLogger() # outfile filetype = get_nsefiletype(filename) basename = os.path.basename(filename) outfile, _ = os.path.splitext(basename) outfile = os.path.join(outdir, outfile + ".h5") if data_type is None: # guess data type based on file reader = data_reader.get(filetype) else: # data type given reader = data_reader.get(data_type) data = reader(filename) outfile = _make_outfilename(basename, outdir=outdir, filetype=filetype) _log.info('converting %s to HDF5', basename) if os.path.exists(outfile) and not overwrite: t_out = os.path.getctime(outfile) Loading @@ -203,9 +238,6 @@ def convert_to_hdf(filename, outdir, data_type='echo', **kwargs): if t_out > t_src: _log.info('file %s is newer than %s', outfile, filename) return outfile reader = data_reader.get(data_type, None) data = reader(filename) #print(data) converter = HdfConverter(outfile, mode=mode, version=version_info()) converter.write(data, compression=compression, schema=schema) return outfile Loading pysen/inout/scans.py +1 −1 Original line number Diff line number Diff line Loading @@ -141,7 +141,7 @@ class EchoScan(BaseScan): def read_nexus(self, nxsfile, **kwargs): """read nexus file""" phase_step = kwargs.pop('phase_step') phase_step = kwargs.pop('phase_step', None) if not super().read_nexus(nxsfile, scan_type='echo', **kwargs): return False ntaus, nphases = self.get_taus_and_phases() Loading pysen/inout/utils.py +28 −11 Original line number Diff line number Diff line Loading @@ -30,16 +30,20 @@ def get_open_file(filename): return open def get_nsefiletype(filename): "get filetype" base, ext = os.path.splitext(os.path.basename(filename)) if ext=='.echo': return "application/nse-echo" if ext=='.h5': _, ext2 = os.path.splitext(base) if not ext2: return "application/nse-echo-hdf" nxsfile = h5py.File(filename, 'r') NSE_FILE_TYPES = { "application/nse-echo" : '.echo' , "application/nse-xyz" : '.xyz' , "application/nse-echo-hdf" : '.h5' , "application/nexus-nse-echo" : '.nxs.h5', "application/nexus-transmission" : '.nxs.h5', "application/nexus-diffraction" : '.nxs.h5', } def get_nexusfiletype(filename): "get nexus filetype" # read /entry/notes to determine the filetype with h5py.File(filename, 'r') as nxsfile: try: notes = nxsfile['/entry/notes'] notes = bytes(notes[0]).decode('ascii').lower() Loading @@ -49,7 +53,20 @@ def get_nsefiletype(filename): return "application/nexus-diffraction" if 'scan=echo' in notes: return "application/nexus-nse-echo" except KeyError: pass return mimetypes.guess_type(filename)[0] def get_nsefiletype(filename): "get filetype" base, ext1 = os.path.splitext(os.path.basename(filename)) if ext1=='.echo': return "application/nse-echo" if ext1=='.xyz': return "application/nse-xyz" if ext1=='.h5': base, ext2 = os.path.splitext(base) if not ext2: return "application/nse-echo-hdf" return get_nexusfiletype(filename) return mimetypes.guess_type(filename)[0] test/test_inout_hdf.py +68 −27 Original line number Diff line number Diff line Loading @@ -7,7 +7,7 @@ import tempfile import h5py import numpy as np from pysen.inout.hdf import HdfConverter from pysen.inout.hdf import HdfConverter, convert_to_hdf from pysen.inout.reader import read_echo from pysen.iostrings import encode, decode Loading Loading @@ -36,20 +36,13 @@ class IOHDF5TestCase(unittest.TestCase): [b'a string', 42.0, b'K']], encode([value, value])) def test_write_hdf(self): "test HDF writer" inpfile = os.path.join(TestDataDir, 's4863.echo') outfile = tempfile.NamedTemporaryFile(suffix='.h5') converter = HdfConverter(outfile.name) data = read_echo(inpfile) converter.write(data) hdf = h5py.File(outfile.name, mode='r') def verify_hdf_file1(self, filename): "verify hdf file (s4863.echo)" with h5py.File(filename, mode='r') as hdf: self.assertEqual(decode(hdf.attrs['type'] ), 'sns-nse') self.assertEqual(decode(hdf.attrs['master_comment']), 'Grafoil_290p0K Sat Jan 19 11:19:28 2013') self.assertEqual(decode(hdf.attrs['master_comment']), 'Grafoil_290p0K Sat Jan 19 11:19:28 2013') data = hdf['/data'] self.assertEqual(len(data), 2) Loading @@ -66,6 +59,54 @@ class IOHDF5TestCase(unittest.TestCase): self.assertEqual(cnt[14, 1, 12] , exp_cnt[iecho]) self.assertEqual(det[26, 15, 16, 37], exp_det[iecho]) def verify_hdf_file2(self, filename): "verify hdf file (NSE_15617.nxs.h5)" with h5py.File(filename, mode='r') as hdf: self.assertEqual(decode(hdf.attrs['type'] ), 'sns-nse') self.assertEqual(decode(hdf.attrs['master_comment']), 'POPC_Chol40_PP5_11A__q_0p053 Thu Oct 3 07:52:09 2024') data = hdf['/data'] self.assertEqual(len(data), 4) exp_det = [3, 1, 1, 3] for iecho, echo in enumerate(data.values()): self.assertEqual(len(echo), 4) cnt = echo['phase/counter'][...] det = echo['phase/detector'][...] # self.assertEqual(cnt.shape, (21, 4, 42)) self.assertEqual(det.shape, (21, 32, 32, 42)) # self.assertEqual(det[20, 15, 16, 21], exp_det[iecho]) def test_write_hdf(self): "test HDF writer" inpfile = os.path.join(TestDataDir, 's4863.echo') with tempfile.NamedTemporaryFile(suffix='.h5') as out: outfile = out.name converter = HdfConverter(outfile) data = read_echo(inpfile) converter.write(data) self.verify_hdf_file1(outfile) def test_convert_to_hdf_echo(self): "test convert to HDF (plain echo)" inpfile = os.path.join(TestDataDir, 's4863.echo') with tempfile.TemporaryDirectory() as tmpdir: outfile = convert_to_hdf(inpfile, tmpdir, compression='gzip') self.verify_hdf_file1(outfile) def test_convert_to_hdf_nexus_echo(self): "test convert to HDF (nexus_echo)" inpfile = os.path.join(TestDataDir, 'NSE_15617.nxs.h5') with tempfile.TemporaryDirectory() as tmpdir: outfile = convert_to_hdf(inpfile, tmpdir, compression='gzip') self.verify_hdf_file2(outfile) # ########################################################################################## def xtest_plot(plot, inpfile): 'test with some plots' Loading Loading
pysen/inout/__init__.py +1 −0 Original line number Diff line number Diff line Loading @@ -7,5 +7,6 @@ from .reader import ( read_echo, read_magnetic, read_xyz, # NOQA read_datreat , read_transmission ) # NOQA from .writer import EchoWriter, make_echofilename # NOQA from .scans import EchoScan, XYZScan # NOQA from .utils import get_nsefiletype # NOQA # from .legacy_writer import write_csv, generate_echo # NOQA # from .legacy_history import extract_data, list_variables, parse_date # NOQA
pysen/inout/hdf.py +47 −15 Original line number Diff line number Diff line Loading @@ -10,7 +10,8 @@ import numpy as np from ..revision import version as version_info from ..iostrings import encode from .reader import read_echo, read_magnetic, read_xyz from .writer import EchoWriter, make_echofilename from .utils import get_nsefiletype #FIXME: we need to work on the idea of a schema (or not???) SCHEMA_DEFAULT = { Loading Loading @@ -175,17 +176,43 @@ class HdfConverter: return subgroup, subkey def convert_to_hdf(filename, outdir, data_type='echo', **kwargs): """convert .echo, xyz analysis .xyz or (old) magnetic .dat file into an HDF file def read_nexus_echo(filename, outdir='.'): "read nexus echo file" echo = EchoWriter(make_echofilename(filename)) if not echo.read_nexus(filename): raise RuntimeError("Error reading NeXus file {filename}") echo.to_echo(outdir) return read_echo(echo.echofile) def _make_outfilename(filename, filetype=None, outdir='.'): """ """ outfile, _ = os.path.splitext(filename) if 'nexus' in filetype: outfile, _ = os.path.splitext(outfile) outfile = os.path.join(outdir, outfile + ".h5") #print(f"\noutfile={outfile}") return outfile def convert_to_hdf(filename, outdir, **kwargs): """convert .echo, xyz analysis .xyz or (old) magnetic .dat file into an HDF5 file @note: will not overwrite the .h5 if it exists and it is newer than the .echo file """ data_reader = { 'echo' : read_echo, 'xyz' : read_xyz, 'magnetic' : read_magnetic} 'magnetic' : read_magnetic, 'nexus_echo' : read_nexus_echo, # 'application/nse-echo' : read_echo, 'application/nse-xyz' : read_xyz, 'application/nexus-nse-echo' : read_nexus_echo, } data_type = kwargs.pop('data_type', None) mode = kwargs.pop('mode', 'w') schema = kwargs.pop('schema', None) compression = kwargs.pop('compression', None) Loading @@ -193,9 +220,17 @@ def convert_to_hdf(filename, outdir, data_type='echo', **kwargs): _log = logging.getLogger() # outfile filetype = get_nsefiletype(filename) basename = os.path.basename(filename) outfile, _ = os.path.splitext(basename) outfile = os.path.join(outdir, outfile + ".h5") if data_type is None: # guess data type based on file reader = data_reader.get(filetype) else: # data type given reader = data_reader.get(data_type) data = reader(filename) outfile = _make_outfilename(basename, outdir=outdir, filetype=filetype) _log.info('converting %s to HDF5', basename) if os.path.exists(outfile) and not overwrite: t_out = os.path.getctime(outfile) Loading @@ -203,9 +238,6 @@ def convert_to_hdf(filename, outdir, data_type='echo', **kwargs): if t_out > t_src: _log.info('file %s is newer than %s', outfile, filename) return outfile reader = data_reader.get(data_type, None) data = reader(filename) #print(data) converter = HdfConverter(outfile, mode=mode, version=version_info()) converter.write(data, compression=compression, schema=schema) return outfile Loading
pysen/inout/scans.py +1 −1 Original line number Diff line number Diff line Loading @@ -141,7 +141,7 @@ class EchoScan(BaseScan): def read_nexus(self, nxsfile, **kwargs): """read nexus file""" phase_step = kwargs.pop('phase_step') phase_step = kwargs.pop('phase_step', None) if not super().read_nexus(nxsfile, scan_type='echo', **kwargs): return False ntaus, nphases = self.get_taus_and_phases() Loading
pysen/inout/utils.py +28 −11 Original line number Diff line number Diff line Loading @@ -30,16 +30,20 @@ def get_open_file(filename): return open def get_nsefiletype(filename): "get filetype" base, ext = os.path.splitext(os.path.basename(filename)) if ext=='.echo': return "application/nse-echo" if ext=='.h5': _, ext2 = os.path.splitext(base) if not ext2: return "application/nse-echo-hdf" nxsfile = h5py.File(filename, 'r') NSE_FILE_TYPES = { "application/nse-echo" : '.echo' , "application/nse-xyz" : '.xyz' , "application/nse-echo-hdf" : '.h5' , "application/nexus-nse-echo" : '.nxs.h5', "application/nexus-transmission" : '.nxs.h5', "application/nexus-diffraction" : '.nxs.h5', } def get_nexusfiletype(filename): "get nexus filetype" # read /entry/notes to determine the filetype with h5py.File(filename, 'r') as nxsfile: try: notes = nxsfile['/entry/notes'] notes = bytes(notes[0]).decode('ascii').lower() Loading @@ -49,7 +53,20 @@ def get_nsefiletype(filename): return "application/nexus-diffraction" if 'scan=echo' in notes: return "application/nexus-nse-echo" except KeyError: pass return mimetypes.guess_type(filename)[0] def get_nsefiletype(filename): "get filetype" base, ext1 = os.path.splitext(os.path.basename(filename)) if ext1=='.echo': return "application/nse-echo" if ext1=='.xyz': return "application/nse-xyz" if ext1=='.h5': base, ext2 = os.path.splitext(base) if not ext2: return "application/nse-echo-hdf" return get_nexusfiletype(filename) return mimetypes.guess_type(filename)[0]
test/test_inout_hdf.py +68 −27 Original line number Diff line number Diff line Loading @@ -7,7 +7,7 @@ import tempfile import h5py import numpy as np from pysen.inout.hdf import HdfConverter from pysen.inout.hdf import HdfConverter, convert_to_hdf from pysen.inout.reader import read_echo from pysen.iostrings import encode, decode Loading Loading @@ -36,20 +36,13 @@ class IOHDF5TestCase(unittest.TestCase): [b'a string', 42.0, b'K']], encode([value, value])) def test_write_hdf(self): "test HDF writer" inpfile = os.path.join(TestDataDir, 's4863.echo') outfile = tempfile.NamedTemporaryFile(suffix='.h5') converter = HdfConverter(outfile.name) data = read_echo(inpfile) converter.write(data) hdf = h5py.File(outfile.name, mode='r') def verify_hdf_file1(self, filename): "verify hdf file (s4863.echo)" with h5py.File(filename, mode='r') as hdf: self.assertEqual(decode(hdf.attrs['type'] ), 'sns-nse') self.assertEqual(decode(hdf.attrs['master_comment']), 'Grafoil_290p0K Sat Jan 19 11:19:28 2013') self.assertEqual(decode(hdf.attrs['master_comment']), 'Grafoil_290p0K Sat Jan 19 11:19:28 2013') data = hdf['/data'] self.assertEqual(len(data), 2) Loading @@ -66,6 +59,54 @@ class IOHDF5TestCase(unittest.TestCase): self.assertEqual(cnt[14, 1, 12] , exp_cnt[iecho]) self.assertEqual(det[26, 15, 16, 37], exp_det[iecho]) def verify_hdf_file2(self, filename): "verify hdf file (NSE_15617.nxs.h5)" with h5py.File(filename, mode='r') as hdf: self.assertEqual(decode(hdf.attrs['type'] ), 'sns-nse') self.assertEqual(decode(hdf.attrs['master_comment']), 'POPC_Chol40_PP5_11A__q_0p053 Thu Oct 3 07:52:09 2024') data = hdf['/data'] self.assertEqual(len(data), 4) exp_det = [3, 1, 1, 3] for iecho, echo in enumerate(data.values()): self.assertEqual(len(echo), 4) cnt = echo['phase/counter'][...] det = echo['phase/detector'][...] # self.assertEqual(cnt.shape, (21, 4, 42)) self.assertEqual(det.shape, (21, 32, 32, 42)) # self.assertEqual(det[20, 15, 16, 21], exp_det[iecho]) def test_write_hdf(self): "test HDF writer" inpfile = os.path.join(TestDataDir, 's4863.echo') with tempfile.NamedTemporaryFile(suffix='.h5') as out: outfile = out.name converter = HdfConverter(outfile) data = read_echo(inpfile) converter.write(data) self.verify_hdf_file1(outfile) def test_convert_to_hdf_echo(self): "test convert to HDF (plain echo)" inpfile = os.path.join(TestDataDir, 's4863.echo') with tempfile.TemporaryDirectory() as tmpdir: outfile = convert_to_hdf(inpfile, tmpdir, compression='gzip') self.verify_hdf_file1(outfile) def test_convert_to_hdf_nexus_echo(self): "test convert to HDF (nexus_echo)" inpfile = os.path.join(TestDataDir, 'NSE_15617.nxs.h5') with tempfile.TemporaryDirectory() as tmpdir: outfile = convert_to_hdf(inpfile, tmpdir, compression='gzip') self.verify_hdf_file2(outfile) # ########################################################################################## def xtest_plot(plot, inpfile): 'test with some plots' Loading