ess-dive-meta.py 12.8 KB
Newer Older
Crow, Michael C's avatar
Crow, Michael C committed
1
2
#!/usr/bin/env python3
"""
Crow, Michael C's avatar
Crow, Michael C committed
3
# ESS-DIVE File Level Metadata Extractor
Crow, Michael C's avatar
Crow, Michael C committed
4
# Jack McNelis, Michael Crow, Yaxing Wei, Daine Wright, Tammy Walker, and Ranjeet Devarakonda
Crow, Michael C's avatar
Crow, Michael C committed
5
6
7
8
9
10
11
12
13
14
15
16
"""
import os
import re
import sys
import json
import numpy as np
import pandas as pd
import argparse
from hashlib import md5
from io import StringIO
from datetime import datetime as dt

17
18
19
verbose = False
debug = False
trace = False
Crow, Michael C's avatar
Crow, Michael C committed
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160

demo_configuration = {
    'columns': {
        'lat': {
            'regex': re.compile('.*lat.*'),              # minimal regex
            'common': ["y", "lt", "lat", "latitude"],    # common strings
        },
        'lon': {
            'regex': re.compile('.*lon.*'),
            'common': ["x", "ln", "lng", "lon", "long", "longitude"],
        },
        'date': {
            'regex': re.compile('.*date.*'),
            'common': ['date', 'datetime', 'timestamp'],
        },
        'time': {
            'regex': re.compile('.*time.*'),
            'common': ['time', 'datetime', 'timestamp'],
        },
    }
}



def file_hash(input_file: str):
    '''
    Gets basic file system stats about a Record's data source.
    
    Parameters
    ----------
    
    Returns
    -------
    
    '''
    hash_md5 = md5()
    
    # Open the file, loop over the chunks, and build hash.
    with open(input_file, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
            
    # Return hex digest.
    return hash_md5.hexdigest()



def file_stats(input_file: str, 
               size_scalar: float=0.000001, 
               date_format: str="%Y-%m-%d %H:%M:%S"):
    '''
    Gets basic file system stats about a Record's data source.
    
    Parameters
    ----------
    
    Returns
    -------
    
    '''

    # Get the file's os stats.
    stat = os.stat(input_file)
    
    # Return a file metadata dictionary.
    return {

        # Compute an md5 checksum.
        'checksum': file_hash(input_file),
        
        # Get file size in human readable units.
        'size': size_scalar * stat.st_size, 
        
        # Get the block size.
        'block_size': stat.st_blksize, 
        
        # Get the last modified date.
        'last_modified': dt.fromtimestamp(stat.st_mtime).strftime(date_format),

    }



def text_match(fields: list, regex: str, common: list):
    """
    Search a list (or dict) of fields for matches to an input regex.
    Parameters
    ----------
    regex: A regular expression to match.
    fields: A list or dict of fields to try to match.
    Returns
    -------
    The best match among matches to regex, or None.
    """
    
    # NOTE: Improve this logic.

    # Try to find all of the regex matches from input header row fields.
    result = list(filter(regex.match, [f.lower() for f in fields]))
    
    # Return None if no matches.
    if len(result) == 0:
        return None
    elif len(result) == 1:
        
        # If result length is 1, select matching input and return.
        return [f for f in fields if f.lower() == result[0]][0]
    else:
        
        # Else try to match results to one of these common abbreviations.
        result = [r for r in result if r in common]
        
        # Return according to the same conditions as before.
        if len(result) == 0:
            return None
        elif len(result) == 1:
            return [f for f in fields if f.lower() == result[0]][0]
        else:

            # If there are still more than one, just guess the first one.
            return [f for f in fields if f.lower() == result[0]][0]
            
    


def text_find_header(input_file: str):
    """
    Is some custom logic to find the header row of the input text file.
    
    Parameters
    ----------
    
    Returns
    -------

    """
    
    # Open the file.
    with open(input_file, "r") as f:

        # Get the lines.
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
        try:
            lns = f.readlines()
        except:
            print('failed')
            if verbose:
                try:
                    lns = []
                    for l in f:
                        print('l:',l)
                        lns.append(l)
                    print('----------------------')
                    print(lns)
                    print('----------------------')
                except:
                    sys.exit(1)
        if verbose:
            n = len([l for l in lns if l.startswith('#')])
            print('lines ignored:', n)
Crow, Michael C's avatar
Crow, Michael C committed
179
180
181
182
183
184

    # Get the number of lines.
    ln_count = len(lns)

    # Get one quarter tail of the file; make pseudo file.
    tail_io = StringIO("\n".join(lns[-1*int(ln_count/4):]))
185
    if trace: print(tail_io.readlines())
Crow, Michael C's avatar
Crow, Michael C committed
186
187
188

    # Read with pandas and count the number of columns.
    tail_cols = len(list(pd.read_csv(tail_io)))
189
    #, skiprows=[0])))
Crow, Michael C's avatar
Crow, Michael C committed
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237

    # Loop until we find the header row.
    for i, ln in enumerate(lns):

        # Get the line as stringio.
        ln_sio = StringIO(ln)

        # Read with pandas.
        ln_row = pd.read_csv(ln_sio, header=None)

        # Get length.
        sz_row = len(list(ln_row))

        # Get number of nans in row.
        na_count = ln_row.isnull().sum(axis=1).values[0]

        # Get difference of rowsize and nancount.
        na_diff = sz_row - na_count

        # If the number of NaNs is greater than one half the total columns.
        if int(tail_cols/2) < na_count:
            pass

        # If sz_row = na_diff, return the header row number.
        elif sz_row == na_diff and sz_row == tail_cols:
            return i
        
    # If unsuccessful, return None.
    return None


def text_parser(file: str, metadata: bool=False, dropna: bool=True):
    """
    A table parser for any input file that can be read as text.

    Parameters
    ----------
    input_file (str): full path to an input file.
    drop_nans (bool): If True, drop any rows that are completely nan.

    Returns:
    -------
    A pandas.DataFrame parsed from the file (if successful) and
    an integer indicating the header row number (if hdrline is True).

    """
    # Find the header row.
    header_row = text_find_header(file)
238

Crow, Michael C's avatar
Crow, Michael C committed
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
    # Read the table.
    df = pd.read_csv(file, header=header_row)
    
    # Optionally, drop the rows that have no valid values.
    if dropna:
        
        # Get the size of the index.
        row_count = df.index.size
        
        # Drop empty rows from the table.
        df = df.dropna(0)
        
        # Get the number of empty rows.
        row_count_empty = row_count-df.index.size

    return df
        
    

class NumpyEncoder(json.JSONEncoder):
    '''Numpy type JSON encoder.'''
    
    # numpy integer types.
    np_ints = (np.int_, np.intc, np.intp, np.int8, np.int16, np.int32, 
        np.int64, np.uint8, np.uint16, np.uint32, np.uint64, )
    
    # numpy float types.
    np_flts = (np.float_, np.float16, np.float32, np.float64, )
    
    # Conditional return.
    def default(self, obj):
        if isinstance(obj, self.np_ints):
            return int(obj)
        elif isinstance(obj, self.np_flts):
            return float(obj)
        elif isinstance(obj,(np.ndarray,)):
            return obj.tolist()
        
        return json.JSONEncoder.default(self, obj)

    

def data_frame_summary(data_frame):
    """
    
    Parameters
    ----------
    
    Returns
    -------

    """
    
    # Get a dictionary of statistics for the columns.
    stats = data_frame.describe().to_dict()

    # Get a dictionary of dtypes for the columns.
    dtypes = data_frame.dtypes.to_dict()
    
    # Merge the two dictionaries.
    for variable, dtype in dtypes.items():

        # If no stats exist for this variable, make a new item.
        if variable not in list(stats.keys()):

            # Collect a few more details.
            stats[variable] = {
                'dtype': dtype.__str__(),
                'vtype': "categorical",
                'unique': data_frame[variable].unique()
            }
        
        # Else just merge the two.
        # else:
        #     stats[variable].update({
        #         'dtype': dtype.__str__(),
        #         'vtype': "continuous",
        #     })
    
    # Merge the two dictionaries by key.
    return stats
    


def coord_summary(data_frame, config: dict):
    """
    
    Parameters
    ----------
    
    Returns
    -------

    """
    
    # A dictionary output.
    coords = {}
    
    # Loop over the coordinate columns in the config.
    for coord, matchers in config['columns'].items():
        
        try:
            # Try to match to the list of columns.
            match = text_match(
                fields=list(data_frame),
                regex=matchers['regex'], 
                common=matchers['regex'],
            )
            
            # Grab the column.
            column = data_frame[match]
350

Crow, Michael C's avatar
Crow, Michael C committed
351
352
            # Get the min and max of the matched columns.
            coords[coord] = {'min': column.min(), 'max': column.max()}
353

Crow, Michael C's avatar
Crow, Michael C committed
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
        except Exception as e:
            raise e
            # Ignore exceptions (for now).
            pass
    
    return coords


def get_selected(data_frame, config: dict):
    """
    Parameters
    ----------
    Returns
    -------
    """
    # A dictionary output.
    coords = {}

    # Loop over the coordinate columns in the config.
    for coord, matchers in config['columns'].items():

        try:
            # Try to match to the list of columns.
            match = text_match(
                fields=list(data_frame),
                regex=matchers['regex'],
                common=matchers['regex'],
            )

            # Grab the column.
            column = data_frame[match]
            # print(column.values)
            coords[coord] = [ column.unique() ]

            # Get the min and max of the matched columns.
            # coords[coord] = {'min': column.min(), 'max': column.max()}

        except Exception as e:
            raise e
            # Ignore exceptions (for now).
            pass

    return coords


def main(input_file: str, config=None):

    # Makea dictionary of metadata.
    metadata = {'file': input_file}
    
    # Get a dictionary of file properties.
    metadata['properties'] = file_stats(input_file)

    # Parse the file.
    df = text_parser(input_file)
    
    # Add the columns metadata to the dictionary.
    if config == demo_configuration:
        metadata['summary'] = data_frame_summary(df)
    
    # Try to get metadata about coordinate columns.
    #metadata['flmd'] = get_selected(df, config)
    metadata['coordinates'] = coord_summary(df, config)
    # metadata['coordinates'] = coord_summary(df, demo_configuration)

    return metadata

def buildConfiguration(fields):
    config = {
        'columns': {}
    }
    for field in fields:
        config['columns'][field] = {
            'regex': re.compile('.*'+field+'.*'),
            # 'common': ['']
        }
    return config


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
435
436
437
    parser.add_argument('-v', '--verbose', action='store_true', help='turn on additional printing')
    parser.add_argument('-b', '--debug', action='store_true', help='turn on additional printing for debugging')
    parser.add_argument('-t', '--trace', action='store_true', help='turn on much more printing for debugging')
Crow, Michael C's avatar
Crow, Michael C committed
438
439
    parser.add_argument('-g', '--config', dest='config', type=argparse.FileType('r'), default=demo_configuration,
                        help='csv file with a single row containing items to retrieve from the files - Ex: site,plot,latitude,soil_depth')
440
441
442
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument('-d', '--directory', dest='dir', type=str, help='directory containing csv files')#, required=True)
    group.add_argument('-f', '--file', dest='file', type=argparse.FileType('r'), help='csv file')#, required=True)
Crow, Michael C's avatar
Crow, Michael C committed
443
444
445
446
    args = parser.parse_args()

    if args.config != demo_configuration:
        fields = pd.read_csv(args.config)
447
448
449
450
451
        args.config = buildConfiguration(fields)

    verbose = args.verbose
    debug = args.debug
    trace = args.trace
Crow, Michael C's avatar
Crow, Michael C committed
452
453
454
455
456
457
458

    # if args.config != demo_configuration:
    #     items = list(pd.read_csv(args.config).keys())
    # else:
    #     items = None
    #     config = args.config

459
460
461
462
463
464
    if args.dir:
        if not os.path.isdir(args.dir):
            print('Must be a valid directory')
            parser.print_usage()
            sys.exit(1)

Crow, Michael C's avatar
Crow, Michael C committed
465
        for (root, dirs, files) in os.walk(args.dir):
466

Crow, Michael C's avatar
Crow, Michael C committed
467
            paths = [ os.path.join(root, file) for file in files if file.endswith('csv') ]
468

Crow, Michael C's avatar
Crow, Michael C committed
469
470
471
            for path in paths:

                # Call main.
472
                output = main(path, config=args.config)
Crow, Michael C's avatar
Crow, Michael C committed
473
474
475
476

                # Dump the dict as a json to stdout.
                print(json.dumps(output, indent=2, cls=NumpyEncoder))

477
478
479
480
481
482
483
    elif args.file:
        output = main(args.file.name, config=args.config)
        print(json.dumps(output, indent=2, cls=NumpyEncoder))

    else:
        print('something went wrong')
        sys.exit(1)
Crow, Michael C's avatar
Crow, Michael C committed
484
485
486
487

    # Exit demo.
    sys.exit()