Commit 929e0a64 authored by Crow, Michael C's avatar Crow, Michael C
Browse files

Inital commit

parents
.DS_Store
tmp/
bu/
venv/
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
# ESS-DIVE-META
Extract file-level metadata from csv files in a directory.
# Requirements
- Python 3.5+
# Installation
Use the package manager [pip](https://pip.pypa.io/en/stable/) to install the following packages.
```bash
python3 -m pip install numpy
python3 -m pip install pandas
```
# Setup
In the same directory with the script, create a folder, and put csv files there.
.
├── ess-dive-meta.py
├── csv
│   └── NWT_wildfires_biophysical_2016.csv
└── config.csv
# Usage
```bash
# help documentation
python3 ess-dive-meta.py -h
# complete summary
python3 ess-dive-meta.py --directory example/
# only specified within config.csv
python3 ess-dive-meta.py --directory example/ --config config.csv
```
# Config File Format
Simple csv file with only the names of the column headers you wish to extract.
Example:
site,latitude,longitude,time,ecosystem
\ No newline at end of file
site,latitude,longitude,time,ecosystem
#!/usr/bin/env python3
"""
# ESS-Dive Metadata
# Jack McNelis, Yaxing Wei, Daine Wright, Michael Crow, Ranjeet Devarakonda
# mcnelisjj@ornl.gov
"""
import os
import re
import sys
import json
import numpy as np
import pandas as pd
import argparse
from hashlib import md5
from io import StringIO
from datetime import datetime as dt
demo_configuration = {
'columns': {
'lat': {
'regex': re.compile('.*lat.*'), # minimal regex
'common': ["y", "lt", "lat", "latitude"], # common strings
},
'lon': {
'regex': re.compile('.*lon.*'),
'common': ["x", "ln", "lng", "lon", "long", "longitude"],
},
'date': {
'regex': re.compile('.*date.*'),
'common': ['date', 'datetime', 'timestamp'],
},
'time': {
'regex': re.compile('.*time.*'),
'common': ['time', 'datetime', 'timestamp'],
},
}
}
def file_hash(input_file: str):
'''
Gets basic file system stats about a Record's data source.
Parameters
----------
Returns
-------
'''
hash_md5 = md5()
# Open the file, loop over the chunks, and build hash.
with open(input_file, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
# Return hex digest.
return hash_md5.hexdigest()
def file_stats(input_file: str,
size_scalar: float=0.000001,
date_format: str="%Y-%m-%d %H:%M:%S"):
'''
Gets basic file system stats about a Record's data source.
Parameters
----------
Returns
-------
'''
# Get the file's os stats.
stat = os.stat(input_file)
# Return a file metadata dictionary.
return {
# Compute an md5 checksum.
'checksum': file_hash(input_file),
# Get file size in human readable units.
'size': size_scalar * stat.st_size,
# Get the block size.
'block_size': stat.st_blksize,
# Get the last modified date.
'last_modified': dt.fromtimestamp(stat.st_mtime).strftime(date_format),
}
def text_match(fields: list, regex: str, common: list):
"""
Search a list (or dict) of fields for matches to an input regex.
Parameters
----------
regex: A regular expression to match.
fields: A list or dict of fields to try to match.
Returns
-------
The best match among matches to regex, or None.
"""
# NOTE: Improve this logic.
# Try to find all of the regex matches from input header row fields.
result = list(filter(regex.match, [f.lower() for f in fields]))
# Return None if no matches.
if len(result) == 0:
return None
elif len(result) == 1:
# If result length is 1, select matching input and return.
return [f for f in fields if f.lower() == result[0]][0]
else:
# Else try to match results to one of these common abbreviations.
result = [r for r in result if r in common]
# Return according to the same conditions as before.
if len(result) == 0:
return None
elif len(result) == 1:
return [f for f in fields if f.lower() == result[0]][0]
else:
# If there are still more than one, just guess the first one.
return [f for f in fields if f.lower() == result[0]][0]
def text_find_header(input_file: str):
"""
Is some custom logic to find the header row of the input text file.
Parameters
----------
Returns
-------
"""
# Open the file.
with open(input_file, "r") as f:
# Get the lines.
lns = f.readlines()
# Get the number of lines.
ln_count = len(lns)
# Get one quarter tail of the file; make pseudo file.
tail_io = StringIO("\n".join(lns[-1*int(ln_count/4):]))
# Read with pandas and count the number of columns.
tail_cols = len(list(pd.read_csv(tail_io)))
# Loop until we find the header row.
for i, ln in enumerate(lns):
# Get the line as stringio.
ln_sio = StringIO(ln)
# Read with pandas.
ln_row = pd.read_csv(ln_sio, header=None)
# Get length.
sz_row = len(list(ln_row))
# Get number of nans in row.
na_count = ln_row.isnull().sum(axis=1).values[0]
# Get difference of rowsize and nancount.
na_diff = sz_row - na_count
# If the number of NaNs is greater than one half the total columns.
if int(tail_cols/2) < na_count:
pass
# If sz_row = na_diff, return the header row number.
elif sz_row == na_diff and sz_row == tail_cols:
return i
# If unsuccessful, return None.
return None
def text_parser(file: str, metadata: bool=False, dropna: bool=True):
"""
A table parser for any input file that can be read as text.
Parameters
----------
input_file (str): full path to an input file.
drop_nans (bool): If True, drop any rows that are completely nan.
Returns:
-------
A pandas.DataFrame parsed from the file (if successful) and
an integer indicating the header row number (if hdrline is True).
"""
# Find the header row.
header_row = text_find_header(file)
# Read the table.
df = pd.read_csv(file, header=header_row)
# Optionally, drop the rows that have no valid values.
if dropna:
# Get the size of the index.
row_count = df.index.size
# Drop empty rows from the table.
df = df.dropna(0)
# Get the number of empty rows.
row_count_empty = row_count-df.index.size
return df
class NumpyEncoder(json.JSONEncoder):
'''Numpy type JSON encoder.'''
# numpy integer types.
np_ints = (np.int_, np.intc, np.intp, np.int8, np.int16, np.int32,
np.int64, np.uint8, np.uint16, np.uint32, np.uint64, )
# numpy float types.
np_flts = (np.float_, np.float16, np.float32, np.float64, )
# Conditional return.
def default(self, obj):
if isinstance(obj, self.np_ints):
return int(obj)
elif isinstance(obj, self.np_flts):
return float(obj)
elif isinstance(obj,(np.ndarray,)):
return obj.tolist()
return json.JSONEncoder.default(self, obj)
def data_frame_summary(data_frame):
"""
Parameters
----------
Returns
-------
"""
# Get a dictionary of statistics for the columns.
stats = data_frame.describe().to_dict()
# Get a dictionary of dtypes for the columns.
dtypes = data_frame.dtypes.to_dict()
# Merge the two dictionaries.
for variable, dtype in dtypes.items():
# If no stats exist for this variable, make a new item.
if variable not in list(stats.keys()):
# Collect a few more details.
stats[variable] = {
'dtype': dtype.__str__(),
'vtype': "categorical",
'unique': data_frame[variable].unique()
}
# Else just merge the two.
# else:
# stats[variable].update({
# 'dtype': dtype.__str__(),
# 'vtype': "continuous",
# })
# Merge the two dictionaries by key.
return stats
def coord_summary(data_frame, config: dict):
"""
Parameters
----------
Returns
-------
"""
# A dictionary output.
coords = {}
# Loop over the coordinate columns in the config.
for coord, matchers in config['columns'].items():
try:
# Try to match to the list of columns.
match = text_match(
fields=list(data_frame),
regex=matchers['regex'],
common=matchers['regex'],
)
# Grab the column.
column = data_frame[match]
# Get the min and max of the matched columns.
coords[coord] = {'min': column.min(), 'max': column.max()}
except Exception as e:
raise e
# Ignore exceptions (for now).
pass
return coords
def get_selected(data_frame, config: dict):
"""
Parameters
----------
Returns
-------
"""
# A dictionary output.
coords = {}
# Loop over the coordinate columns in the config.
for coord, matchers in config['columns'].items():
try:
# Try to match to the list of columns.
match = text_match(
fields=list(data_frame),
regex=matchers['regex'],
common=matchers['regex'],
)
# Grab the column.
column = data_frame[match]
# print(column.values)
coords[coord] = [ column.unique() ]
# Get the min and max of the matched columns.
# coords[coord] = {'min': column.min(), 'max': column.max()}
except Exception as e:
raise e
# Ignore exceptions (for now).
pass
return coords
def main(input_file: str, config=None):
# Makea dictionary of metadata.
metadata = {'file': input_file}
# Get a dictionary of file properties.
metadata['properties'] = file_stats(input_file)
# Parse the file.
df = text_parser(input_file)
# Add the columns metadata to the dictionary.
if config == demo_configuration:
metadata['summary'] = data_frame_summary(df)
# Try to get metadata about coordinate columns.
#metadata['flmd'] = get_selected(df, config)
metadata['coordinates'] = coord_summary(df, config)
# metadata['coordinates'] = coord_summary(df, demo_configuration)
return metadata
def buildConfiguration(fields):
config = {
'columns': {}
}
for field in fields:
config['columns'][field] = {
'regex': re.compile('.*'+field+'.*'),
# 'common': ['']
}
return config
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--directory', dest='dir', type=str, help='directory containing csv files', required=True)
parser.add_argument('-g', '--config', dest='config', type=argparse.FileType('r'), default=demo_configuration,
help='csv file with a single row containing items to retrieve from the files - Ex: site,plot,latitude,soil_depth')
args = parser.parse_args()
if args.config != demo_configuration:
fields = pd.read_csv(args.config)
cfg = buildConfiguration(fields)
else:
cfg = args.config
# if args.config != demo_configuration:
# items = list(pd.read_csv(args.config).keys())
# else:
# items = None
# config = args.config
if os.path.isdir(args.dir):
for (root, dirs, files) in os.walk(args.dir):
paths = [ os.path.join(root, file) for file in files if file.endswith('csv') ]
for path in paths:
# Call main.
output = main(path, config=cfg)
# Dump the dict as a json to stdout.
print(json.dumps(output, indent=2, cls=NumpyEncoder))
else:
print('Must be a valid directory')
parser.print_usage()
# Exit demo.
sys.exit()
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment