Newer
Older
from __future__ import (absolute_import, division, print_function)
import h5py
import numpy as np
import subprocess
import shutil
import hashlib
import os
from mantid.kernel import logger
# noinspection PyMethodMayBeStatic
class IOmodule(object):
"""
Krzysztof Dymkowski
committed
Class for Abins I/O HDF file operations.
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""
def __init__(self, input_filename=None, group_name=None):
if isinstance(input_filename, str):
self._input_filename = input_filename
try:
self._hash_input_filename = self.calculate_dft_file_hash()
except (IOError, ValueError) as err:
logger.error(str(err))
# extract name of file from the full path in the platform independent way
filename = os.path.basename(self._input_filename)
if filename.strip() == "":
raise ValueError("Name of the file cannot be an empty string.")
else:
raise ValueError("Invalid name of input file. String was expected.")
if isinstance(group_name, str):
self._group_name = group_name
else:
raise ValueError("Invalid name of the group. String was expected.")
core_name = filename[0:filename.find(".")]
self._hdf_filename = core_name + ".hdf5" # name of hdf file
try:
self._advanced_parameters = self._get_advanced_parameters()
except (IOError, ValueError) as err:
logger.error(str(err))
self._attributes = {} # attributes for group
# data for group; they are expected to be numpy arrays or
# complex data sets which have the form of Python dictionaries or list of Python
# dictionaries
self._data = {}
# Fields which have a form of empty dictionaries have to be set by an inheriting class.
def _valid_hash(self):
"""
Checks if input DFT file and content of HDF file are consistent.
@return: True if consistent, otherwise False.
"""
saved_hash = self.load(list_of_attributes=["hash"])
return self._hash_input_filename == saved_hash["attributes"]["hash"]
def _valid_advanced_parameters(self):
"""
In case of rerun checks if advanced parameters haven't changed.
Returns: True if they are the same, otherwise False
"""
previous_advanced_parameters = self.load(list_of_attributes=["advanced_parameters"])
return self._advanced_parameters == previous_advanced_parameters["attributes"]["advanced_parameters"]
def get_previous_dft_program(self):
"""
:return: name of DFT program which was used in the previous calculation.
"""
return self.load(list_of_attributes=["DFT_program"])["attributes"]["DFT_program"]
def check_previous_data(self):
"""
Checks if currently used DFT file is the same as in the previous calculations. Also checks if currently used
parameters from AbinsParameters are the same as in the previous calculations.
"""
if not self._valid_hash():
raise ValueError("Different DFT file was used in the previous calculations.")
if not self._valid_advanced_parameters():
raise ValueError("Different advanced parameters were used in the previous calculations.")
def erase_hdf_file(self):
"""
Erases content of hdf file.
"""
with h5py.File(self._hdf_filename, 'w') as hdf_file:
hdf_file.close()
def add_attribute(self, name=None, value=None):
"""
Adds attribute to the dictionary with other attributes.
@param name: name of the attribute
@param value: value of the attribute. More about attributes at: http://docs.h5py.org/en/latest/high/attr.html
"""
self._attributes[name] = value
def add_file_attributes(self):
"""
Adds file attributes: filename and hash of file to the collection of all attributes.
@return:
"""
self.add_attribute("hash", self._hash_input_filename)
self.add_attribute("filename", self._input_filename)
self.add_attribute("advanced_parameters", self._advanced_parameters)
def add_data(self, name=None, value=None):
"""
Adds data to the dictionary with the collection of other datasets.
@param name: name of dataset
@param value: value of dataset. Numpy array is expected or complex data sets which have the form of Python
dictionaries or list of Python dictionaries. More about dataset at:
http://docs.h5py.org/en/latest/high/dataset.html
"""
self._data[name] = value
def _save_attributes(self, group=None):
"""
Saves attributes to an hdf file.
@param group: group to which attributes should be saved.
"""
for name in self._attributes:
if isinstance(self._attributes[name], (np.int64, int, np.float64, float, str, bytes)):
group.attrs[name] = self._attributes[name]
else:
raise ValueError("Invalid value of attribute. String, "
"int or bytes was expected! (invalid type : %s)" % type(self._attributes[name]))
def _recursively_save_structured_data_to_group(self, hdf_file=None, path=None, dic=None):
"""
Helper function for saving structured data into an hdf file.
@param hdf_file: hdf file object
@param path: absolute name of the group
@param dic: dictionary to be added
"""
for key, item in dic.items():
folder = path + key
if isinstance(item, (np.int64, int, np.float64, float, str, bytes)):
if folder in hdf_file:
del hdf_file[folder]
hdf_file[folder] = item
elif isinstance(item, np.ndarray):
if folder in hdf_file:
del hdf_file[folder]
hdf_file.create_dataset(name=folder, data=item, compression="gzip", compression_opts=9)
elif isinstance(item, dict):
self._recursively_save_structured_data_to_group(hdf_file=hdf_file, path=folder + '/', dic=item)
else:
raise ValueError('Cannot save %s type' % type(item))
def _save_data(self, hdf_file=None, group=None):
"""
Saves data in the form of numpy array, dictionary or list of dictionaries. In case data in group already exist
it will be overridden.
@param hdf_file: hdf file object to which data should be saved
@param group: group to which data should be saved.
"""
for item in self._data:
# case data to save is a simple numpy array
if isinstance(self._data[item], np.ndarray):
if item in group:
del group[item]
group.create_dataset(name=item, data=self._data[item], compression="gzip", compression_opts=9)
# case data to save has form of list
elif isinstance(self._data[item], list):
num_el = len(self._data[item])
for el in range(num_el):
self._recursively_save_structured_data_to_group(hdf_file=hdf_file,
path=group.name + "/" + item + "/%s/" % el,
dic=self._data[item][el])
# case data has a form of dictionary
elif isinstance(self._data[item], dict):
self._recursively_save_structured_data_to_group(hdf_file=hdf_file,
path=group.name + "/" + item + "/",
dic=self._data[item])
else:
raise ValueError('Invalid structured dataset. Cannot save %s type' % type(item))
def save(self):
"""
Saves datasets and attributes to an hdf file.
"""
with h5py.File(self._hdf_filename, 'a') as hdf_file:
if self._group_name not in hdf_file:
hdf_file.create_group(self._group_name)
group = hdf_file[self._group_name]
if len(self._attributes.keys()) > 0:
self._save_attributes(group=group)
if len(self._data.keys()) > 0:
self._save_data(hdf_file=hdf_file, group=group)
# Repack if possible to reclaim disk space
try:
path = os.getcwd()
temp_file = self._hdf_filename[self._hdf_filename.find(".")] + "temphgfrt.hdf5"
subprocess.check_call(["h5repack" + " -i " + os.path.join(path, self._hdf_filename) +
" -o " + os.path.join(path, temp_file)])
shutil.move(os.path.join(path, temp_file), os.path.join(path, self._hdf_filename))
except (OSError, IOError, RuntimeError):
pass # repacking failed: no h5repack installed in the system... but we proceed
# noinspection PyMethodMayBeStatic
def _list_of_str(self, list_str=None):
"""
Checks if all elements of the list are strings.
@param list_str: list to check
@return: True if each entry in the list is a string, otherwise False
"""
if list_str is None:
return False
if not (isinstance(list_str, list) and
all([isinstance(list_str[item], str) for item in range(len(list_str))])):
raise ValueError("Invalid list of items to load!")
return True
def _load_attributes(self, list_of_attributes=None, group=None):
"""
Loads collection of attributes from the given group.
@param list_of_attributes:
@param group:
@return: dictionary with attributes
"""
results = {}
for item in list_of_attributes:
results[item] = self._load_attribute(name=item, group=group)
return results
def _load_attribute(self, name=None, group=None):
"""
Loads attribute.
@param group: group in hdf file
@param name: name of attribute
@return: value of attribute
"""
if name not in group.attrs:
raise ValueError("Attribute %s in not present in %s file." % (name, self._hdf_filename))
else:
return group.attrs[name]
def _load_datasets(self, hdf_file=None, list_of_datasets=None, group=None):
"""
Loads structured dataset which has a form of Python dictionary directly from an hdf file.
@param hdf_file: hdf file object from which data should be loaded
@param list_of_datasets: list with names of datasets to be loaded
@param group:
@return:
"""
results = {}
for item in list_of_datasets:
results[item] = self._load_dataset(hdf_file=hdf_file, name=item, group=group)
return results
# noinspection PyMethodMayBeStatic
def _get_subgrp_name(self, path=None):
"""
Extracts name of the particular subgroup from the absolute name.
@param path: absolute name of subgroup
@return: name of subgroup
"""
reversed_path = path[::-1]
end = reversed_path.find("/")
return reversed_path[:end]
# noinspection PyMethodMayBeStatic
def _convert_unicode_to_string_core(self, item=None):
"""
Convert atom element from unicode to str
but only in Python 2 where unicode handling is a mess
@param item: converts unicode to item
@return: converted element
"""
assert isinstance(item, unicode)
def _convert_unicode_to_str(self, object_to_check=None):
"""
Converts unicode to Python str, works for nested dicts and lists (recursive algorithm). Only required
for Python 2 where a mismatch with unicode/str objects is a problem for dictionary lookup
@param object_to_check: dictionary, or list with names which should be converted from unicode to string.
"""
if six.PY2:
if isinstance(object_to_check, list):
for i in range(len(object_to_check)):
object_to_check[i] = self._convert_unicode_to_str(object_to_check[i])
elif isinstance(object_to_check, dict):
for item in object_to_check:
if isinstance(item, unicode):
decoded_item = self._convert_unicode_to_string_core(item)
item_dict = object_to_check[item]
del object_to_check[item]
object_to_check[decoded_item] = item_dict
item = decoded_item
object_to_check[item] = self._convert_unicode_to_str(object_to_check[item])
# unicode element
elif isinstance(object_to_check, unicode):
object_to_check = self._convert_unicode_to_string_core(object_to_check)
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
return object_to_check
def _load_dataset(self, hdf_file=None, name=None, group=None):
"""
Loads one structured dataset.
@param hdf_file: hdf file object from which structured dataset should be loaded.
@param name: name of dataset
@param group: name of the main group
@return:
"""
if not isinstance(name, str):
raise ValueError("Invalid name of the dataset.")
if name in group:
hdf_group = group[name]
else:
raise ValueError("Invalid name of the dataset.")
# noinspection PyUnresolvedReferences,PyProtectedMember
if isinstance(hdf_group, h5py._hl.dataset.Dataset):
return hdf_group.value
elif all([self._get_subgrp_name(path=hdf_group[el].name).isdigit() for el in hdf_group.keys()]):
structured_dataset_list = []
# here we make an assumption about keys which have a numeric values; we assume that always : 1, 2, 3... Max
num_keys = len(hdf_group.keys())
for item in range(num_keys):
structured_dataset_list.append(
self._recursively_load_dict_contents_from_group(hdf_file=hdf_file,
path=hdf_group.name + "/%s" % item))
return self._convert_unicode_to_str(object_to_check=structured_dataset_list)
else:
return self._convert_unicode_to_str(
object_to_check=self._recursively_load_dict_contents_from_group(hdf_file=hdf_file,
path=hdf_group.name + "/"))
def _recursively_load_dict_contents_from_group(self, hdf_file=None, path=None):
"""
Loads structure dataset which has form of Python dictionary.
@param hdf_file: hdf file object from which dataset is loaded
@param path: path to dataset in hdf file
@return: dictionary which was loaded from hdf file
"""
ans = {}
for key, item in hdf_file[path].items():
# noinspection PyUnresolvedReferences,PyProtectedMember,PyProtectedMember
if isinstance(item, h5py._hl.dataset.Dataset):
ans[key] = item.value
elif isinstance(item, h5py._hl.group.Group):
ans[key] = self._recursively_load_dict_contents_from_group(hdf_file, path + key + '/')
return ans
def load(self, list_of_attributes=None, list_of_datasets=None):
"""
Loads all necessary data.
@param list_of_attributes: list of attributes to load (list of strings with names of attributes)
@param list_of_datasets: list of datasets to load. It is a list of strings with names of datasets.
Datasets have a form of numpy arrays. Datasets can also have a form of Python
dictionary or list of Python dictionaries.
@return: dictionary with both datasets and attributes
"""
results = {}
with h5py.File(self._hdf_filename, 'r') as hdf_file:
if self._group_name not in hdf_file:
raise ValueError("No group %s in hdf file." % self._group_name)
group = hdf_file[self._group_name]
if self._list_of_str(list_str=list_of_attributes):
results["attributes"] = self._load_attributes(list_of_attributes=list_of_attributes, group=group)
if self._list_of_str(list_str=list_of_datasets):
results["datasets"] = self._load_datasets(hdf_file=hdf_file,
list_of_datasets=list_of_datasets,
group=group)
return results
# noinspection PyMethodMayBeStatic
def _calculate_hash(self, filename=None):
buf = 65536 # chop content of phonon file into 64kb chunks to minimize memory consumption for hash creation
sha = hashlib.sha512()
with open(filename, 'rU') as f:
while True:
data = f.read(buf)
if not data:
break
return sha.hexdigest()
def _get_advanced_parameters(self):
"""
Calculates hash of file with advanced parameters.
Returns: string representation of hash for file with advanced parameters
which contains only hexadecimal digits
"""
return self._calculate_hash(filename=AbinsModules.AbinsParameters.__file__.replace(".pyc", ".py"))
def get_input_filename(self):
return self._input_filename
def calculate_dft_file_hash(self):
"""
This method calculates hash of the phonon file according to SHA-2 algorithm from hashlib library: sha512.
@return: string representation of hash for phonon file which contains only hexadecimal digits
"""
return self._calculate_hash(filename=self._input_filename)