Source code for nsdf.nsdfreader

# nsdfreader.py --- 
# 
# Filename: nsdfreader.py
# Description: 
# Author: Subhasis Ray
# Maintainer: 
# Created: Sat Aug  9 14:49:04 2014 (+0530)
# Version: 
# Last-Updated: 
#           By: 
#     Update #: 0
# URL: 
# Keywords: 
# Compatibility: 
# 
# 

# Commentary: 
# 
# 
# 
# 

# Change log:
# 
# 
# 
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3, or
# (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program; see the file COPYING.  If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street, Fifth
# Floor, Boston, MA 02110-1301, USA.
# 
# 

# Code:

"""Reader for NSDF format"""

import h5py as h5
import numpy as np

from .model import ModelComponent, common_prefix
from .constants import *
from .util import *
from .nsdfdata import *
from datetime import datetime


[docs]class NSDFReader(object): """Reader for NSDF files. This class encapsulates an NSDF file and provides utility functions to read the data in an organized manner. """ def __init__(self, filename): self._fd = h5.File(filename, 'r') self.data = self._fd['data'] self.model = self._fd['model'] self.mapping = self._fd['map'] self.dialect = str(self._fd.attrs['dialect']) def __del__(self): self._fd.close() @property
[docs] def title(self): """Title of the file""" try: return self._fd.attrs['title'] except KeyError: return None
@property def creator(self): return self._fd.attrs['creator'] @property
[docs] def license(self): """License information about the file. This is text string.""" return self._fd.attrs['license']
@property
[docs] def software(self): """Software (one or more) used to generate the data in the file. """ return self._fd.attrs['software']
@property
[docs] def method(self): """(numerical) methods applied in generating the data.""" return self._fd.attrs['method']
@property
[docs] def description(self): """Description of the file. A text string.""" return self._fd.attrs['description']
@property
[docs] def rights(self): """The rights of the file contents.""" return self._fd.attrs['rights']
@property
[docs] def tstart(self): """Start time of the simulation / data recording. A string representation of the timestamp in ISO format """ return self._fd.attrs['tstart']
@property
[docs] def tend(self): """End time of the simulation/recording.""" return self._fd.attrs['tend']
@property
[docs] def contributor(self): """List of contributors to the content of this file.""" return self._fd.attrs['contributor']
@property
[docs] def uniform_populations(self): """Names of the populations for which variables have been recorded with uniform sampling. """ return self.data[UNIFORM].keys()
@property
[docs] def nonuniform_populations(self): """Names of the populations for which variables have been recorded with nonuniform sampling. """ return self.data['nonuniform'].keys()
@property
[docs] def event_populations(self): """Names of the populations for which event variables have been recorded. """ return self.data['event'].keys()
[docs] def get_uniform_vars(self, population): """Returns the names of uniform variables recorded for `population`. Args: population (str): name of the population. Returns: list of str: names of the datasets storing uniform variables. """ return self.data[UNIFORM][population].keys()
[docs] def get_nonuniform_vars(self, population): """Returns the names of nonuniform variables recorded for `population`. Args: population (str): name of the population. Returns: list of str: names of the groups storing nonuniform variables. """ return self.data['nonuniform'][population].keys()
[docs] def get_event_vars(self, population): """Returns the names of event variables recorded for `population`. Args: population (str): name of the population. Returns: list of str: names of the groups storing event variables. """ return self.data['event'][population].keys()
[docs] def get_uniform_dataset(self, population, varname): """Returns the data sources and data contents for recorded variable `varname` from `population`. Args: population (str): name of the population. varname (str): name of the variable. Returns: (sources, data): `sources` is an dataset containing the source identifiers and data is a 2D dataset whose i-th row is the data from the i-th entry in `sources`. """ return (self.mapping[UNIFORM][population][varname], self.data[UNIFORM][population][varname])
def _get_or_create_uniform_ts(self, dataset): try: tstart = dataset.attrs['tstart'] dt = dataset.attrs['dt'] tunit = dataset.attrs['tunit'] ts = np.arange(dataset.shape[1], dtype=np.double) * dt + tstart except KeyError: ts = dataset.dims[1]['time'] tunit = ts.attrs['unit'] return (ts, tunit)
[docs] def get_uniform_ts(self, population, varname): """Returns an array of sampling times and time-unit for the uniform dataset `varname` recorded from `population`. Args: population (str): name of the population of sources. varname (str): name of the recorded variable. Returns: (times, unit) : times is an array of doubles containing the sampling time for each column of the dataset and unit is a string representing the unit of time. """ data = self.data[UNIFORM][population][varname] return self._get_or_create_uniform_ts(data)
[docs] def get_uniform_dt(self, population, varname): """Returns sampling interval and time-unit for the uniform dataset `varname` recorded from `population`. Args: population (str): name of the population of sources. varname (str): name of the recorded variable. Returns: (dt, unit) : `dt` is the sampling interval for this dataset and unit is a string representing the unit of time. """ data = self.data[UNIFORM][population][varname] try: dt = data.attrs['dt'] tunit = data.attrs['tunit'] return (dt, tunit) except KeyError: ts = data.dims[1]['time'] tunit = ts.attrs['unit'] return (ts[1]-ts[0], tunit)
[docs] def get_uniform_row(self, srcid, field): """Get the data for `field` variable recorded from source with unique id `srcid`. Args: srcid (str): unique id of the source. varname (str): name of the variable. Returns: (data, unit, times, timeunit) """ for srcmap in self.mapping[UNIFORM]: sources = np.asarray(srcmap, dtype=str) indices = np.where(sources == srcid)[0] if indices: index = indices[0] for refinfo, dtype in sources.attrs['REFERENCE_LIST']: ref = refinfo[0] dataset = self._fd[ref] if dataset.attrs['field'] == field: data = np.asarray(dataset[index]) unit = dataset.attrs['unit'] ts, tunit = self._get_or_create_uniform_ts(dataset) return (data, unit, ts, tunit)
[docs] def get_uniform_data(self, population, variable): """Returns a UniformData object contents for recorded `variable` from `population`. Args: population (str): name of the population. variable (str): name of the variable. Returns: dataobject (nsdf.UniformData): data container filled with source, data, dt and units. """ data = self.data[UNIFORM][population][variable] mapping = self.mapping[UNIFORM][population] ret = UniformData(data.name.rpartition('/')[-1], unit=data.attrs['unit'], field=data.attrs['field'], dt=data.attrs['dt'], tunit=data.attrs['tunit'], dtype=data.dtype) for src, row in izip(mapping, data): ret.put_data(src, row) return ret
def _get_nonuniform_1d_data(self, data): ret = NonuniformData(data.name.rpartition('/')[-1], unit=data.attrs['unit'], field=data.attrs['field']) for name, dset in data.items(): times = dset.dims[0]['time'] ret.put_data(dset.attrs['source'], (np.asarray(dset), np.asarray(times))) ret.tunit = times.attrs['unit'] ret.dtype = dset.dtype return ret def _get_nonuniform_regular_data(self, data): mapping = data.dims[0]['source'] times = data.dims[1]['time'] ret = NonuniformRegularData(data.name.rpartition('/')[-1], unit=data.attrs['unit'], field=data.attrs['field'], tunit=times.attrs['unit'], dtype=data.dtype) ret.set_times(times) for ii in range(data.shape[0]): ret.put_data(mapping[ii], data[ii]) ret.set_times(np.asarray(times), tunit=times.attrs['unit']) return ret def _get_nonuniform_vlen_data(self, data): mapping = data.dims[0]['source'] times = data.dims[0]['time'] ret = NonuniformData(data.name.rpartition('/')[-1], unit=data.attrs['unit'], field=data.attrs['field'], tunit=times.attrs['unit'], dtype=np.float64) # h5 only supports vlen with 32 bit float, we convert it to float64 for ii in range(data.shape[0]): ret.put_data(mapping[ii], (np.asarray(data[ii]), np.asarray(times[ii]))) return ret def _get_nonuniform_nan_data(self, data): mapping = data.dims[0]['source'] times = data.dims[1]['time'] ret = NonuniformData(data.name.rpartition('/')[-1], unit=data.attrs['unit'], field=data.attrs['field'], tunit=times.attrs['unit']) for iii in range(data.shape[0]): try: starts = next(find(data[iii], np.isnan))[0][0] except StopIteration: starts = len(data[iii]) cleaned_data = np.asarray(data[iii,:starts]) cleaned_times = np.asarray(times[iii,:starts]) ret.put_data(mapping[iii], (cleaned_data, cleaned_times)) return ret
[docs] def get_nonuniform_data(self, population, variable): """Get nonuniform data `variable` under `population`. In NSDF a variable is recorded from a population of sources and data is organized as `population/variable`. This function retrieve this dataset and creates NonuniformData object containing (source, data) pairs. In case all the sources share the same sampling times, it is the NonuniformRegularData, a subclass of NonuniformData and contains the sampling times as a separate array. Otherwise, `data` is tuple of variable values and sampling times. Args: population (str): name of the population from which this data was recorded. variable (str): name of the variable this data represents. Returns: nsdf.NonuniformRegularData if dialect of the file is NUREGULAR. nsdf.NonuniformData otherwise. Note: Data is converted to float64 for VLEN dialect. """ data = self.data[NONUNIFORM][population][variable] mapping = self.mapping[NONUNIFORM][population] if self.dialect == dialect.NUREGULAR: return self._get_nonuniform_regular_data(data) elif self.dialect == dialect.VLEN: return self._get_nonuniform_vlen_data(data) elif self.dialect == dialect.NANPADDED: return self._get_nonuniform_nan_data(data) else: return self._get_nonuniform_1d_data(data)
def _get_event_1d_data(self, datagroup): ret = EventData(datagroup.name.rpartition('/')[-1], unit=datagroup.attrs['unit'], field=datagroup.attrs['field']) for name, dataset in datagroup.items(): ret.put_data(dataset.attrs['source'], np.asarray(dataset)) ret.dtype = dataset.dtype return ret def _get_event_vlen_data(self, data): ret = EventData(data.name.rpartition('/')[-1], unit=data.attrs['unit'], field=data.attrs['field'], dtype=np.float64) # h5 only supports vlen with 32 bit float, we convert it to float64 mapping = data.dims[0]['source'] for iii in range(data.shape[0]): row = np.asarray(data[iii]) ret.put_data(mapping[iii], row) return ret def _get_event_nan_data(self, data): ret = EventData(data.name.rpartition('/')[-1], unit=data.attrs['unit'], field=data.attrs['field'], dtype=data.dtype) mapping = data.dims[0]['source'] for iii in range(data.shape[0]): try: starts = next(find(data[iii], np.isnan))[0][0] except StopIteration: starts = len(data[iii]) cleaned_data = np.asarray(data[iii,:starts]) ret.put_data(mapping[iii], cleaned_data) return ret
[docs] def get_event_data(self, population, variable): """Get event variable recorded from population. In NSDF a variable is recorded from a population of sources and data is organized as `population/variable`. This function retrieve this dataset and creates EventData object containing (source, data) pairs. Args: population (str): name of the population from which this data was recorded. variable (str): name of the variable this data represents. Returns: nsdf.EventData Note: Data is converted to float64 for VLEN dialect. """ data = self.data[EVENT][population][variable] if self.dialect == dialect.VLEN: return self._get_event_vlen_data(data) elif self.dialect == dialect.NANPADDED: return self._get_event_nan_data(data) else: return self._get_event_1d_data(data) # # nsdfreader.py ends here