Source code for fastwater.otm_gpl.selafin
"""
Contains the class Selafin
"""
# _____ ___________________________________________________
# ____/ Imports /__________________________________________________/
#
# ~~> dependencies towards standard python
from struct import unpack, pack, error
import numpy as np
from fastwater.otm_gpl.parser_selafin import get_endian_from_char, \
get_float_type_from_float
from fastwater.otm_gpl.progressbar import ProgressBar
from scipy.spatial import cKDTree
from matplotlib.tri import Triangulation
[docs]class Selafin(object):
""" (DOXYGEN parsing)
Class Selafin
@brief
Read and create Selafin files with python
@details
The idea is to be able to set float_type and float_type_size from
outside when we start to write a selafinfiles.
This will make it possible to do some kind of converters
@history
- switch between big/little endian
- switch to double precission only for float
@TODO
- changes where only tested for simple reading, writing must still be
done
- get_series was not tested
- all appen and put functions must be tested
- needs some intensive testing
"""
datetime = np.asarray([1972, 7, 13, 17, 24, 27])
# ... needed here because optional in SLF (static)
def __init__(self, file_name):
"""
Intialisation of the class
@param file_name (string) Name of the file
"""
self.file = {}
self.file.update({'name': file_name})
# "<" means little-endian, ">" means big-endian
self.file.update({'endian': ">"})
self.file.update({'float': ('f', 4)}) # 'f' size 4, 'd' = size 8
if file_name != '':
self.file.update({'hook': open(file_name, 'rb')})
# ~~> checks endian encoding
self.file['endian'] = get_endian_from_char(self.file['hook'], 80)
# ~~> header parameters
self.tags = {'meta': self.file['hook'].tell()}
self.get_header_metadata_slf()
# ~~> sizes and connectivity
self.get_header_integers_slf()
# ~~> checks float encoding
self.file['float'] = get_float_type_from_float(
self.file['hook'], self.file['endian'], self.npoin3)
# ~~> xy mesh
self.get_header_floats_slf()
# ~~> time series
self.tags = {'cores': [], 'times': []}
self.get_time_history_slf()
else:
self.title = ''
self.nbv1 = 0
self.nbv2 = 0
self.nvar = self.nbv1 + self.nbv2
self.varindex = range(self.nvar)
self.iparam = []
self.nelem3 = 0
self.npoin3 = 0
self.ndp3 = 0
self.nplan = 1
self.nelem2 = 0
self.npoin2 = 0
self.ndp2 = 0
self.varnames = []
self.varunits = []
self.cldnames = []
self.cldunits = []
self.ikle3 = None
self.ikle2 = []
self.ipob2 = []
self.ipob3 = []
self.meshx = []
self.meshy = []
self.tags = {'cores': [], 'times': []}
self.datetime = np.asarray([1972, 7, 13, 17, 15, 13])
self.fole = {}
self.fole.update({'name': ''})
self.fole.update({'endian': self.file['endian']})
self.fole.update({'float': self.file['float']})
self.tree = None
self.neighbours = None
self.edges = None
self.alter_z_names = []
self.alter_zm = None
self.alter_zp = None
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Parsing the Big- and Little-Endian binary file
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_header_metadata_slf(self):
"""
Reads title, variable names and units, date and time
"""
f = self.file['hook']
endian = self.file['endian']
# ~~ Read title ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
_, tmp, _ = unpack(endian+'i80si', f.read(4+80+4))
self.title = tmp.decode('utf-8')
# ~~ Read nbv(1) and nbv(2) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
_, self.nbv1, self.nbv2, _ = unpack(endian+'iiii', f.read(4+8+4))
self.nvar = self.nbv1 + self.nbv2
self.varindex = range(self.nvar)
# ~~ Read variable names and units ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
self.varnames = []
self.varunits = []
for _ in range(self.nbv1):
_, name, unit, _ = unpack(endian+'i16s16si', f.read(4+16+16+4))
self.varnames.append(name.decode('utf8'))
self.varunits.append(unit.decode('utf8'))
self.cldnames = []
self.cldunits = []
for _ in range(self.nbv2):
_, name, unit, _ = unpack(endian+'i16s16si', f.read(4+16+16+4))
self.cldnames.append(name.decode('utf8'))
self.cldunits.append(unit.decode('utf8'))
# ~~ Read iparam array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
dummy = unpack(endian+'12i', f.read(4+40+4))
self.iparam = np.asarray(dummy[1:11])
# ~~ Read DATE/TIME array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
self.datetime = np.asarray([1972, 7, 13, 17, 15, 13])
if self.iparam[9] == 1:
dummy = unpack(endian+'8i', f.read(4+24+4))
self.datetime = np.asarray(dummy[1:9])
[docs] def get_header_integers_slf(self):
"""
Reads nelem, npoin, ndp3, nplan, ikle and ipobo
"""
f = self.file['hook']
endian = self.file['endian']
# ~~ Read nelem3, npoin3, ndp3, nplan ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
_, self.nelem3, self.npoin3, self.ndp3, self.nplan, _ = \
unpack(endian+'6i', f.read(4+16+4))
self.nelem2 = self.nelem3
self.npoin2 = self.npoin3
self.ndp2 = self.ndp3
self.nplan = max(1, self.nplan)
if self.iparam[6] > 1:
self.nplan = self.iparam[6] # /!\ How strange is that ?
self.nelem2 = self.nelem3 // (self.nplan - 1)
self.npoin2 = self.npoin3 // self.nplan
self.ndp2 = self.ndp3 // 2
# ~~ Read the ikle array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.seek(4, 1)
self.ikle3 = np.array(unpack(endian+str(self.nelem3*self.ndp3)+'I',
f.read(4*self.nelem3*self.ndp3))) - 1
f.seek(4, 1)
self.ikle3 = self.ikle3.reshape((self.nelem3, self.ndp3))
if self.nplan > 1:
self.ikle2 = np.compress(np.repeat([True, False], self.ndp2),
self.ikle3[0:self.nelem2], axis=1)
else:
self.ikle2 = self.ikle3
# ~~ Read the ipobo array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.seek(4, 1)
self.ipob3 = np.asarray(unpack(endian+str(self.npoin3)+'i',
f.read(4*self.npoin3)))
f.seek(4, 1)
self.ipob2 = self.ipob3[0:self.npoin2]
[docs] def get_header_floats_slf(self):
"""
Reads the mesh coordinates
"""
f = self.file['hook']
endian = self.file['endian']
# ~~ Read the x-coordinates of the nodes ~~~~~~~~~~~~~~~~~~
ftype, fsize = self.file['float']
f.seek(4, 1)
self.meshx = np.asarray(
unpack(endian+str(self.npoin3)+ftype,
f.read(fsize*self.npoin3))[0:self.npoin2])
f.seek(4, 1)
# ~~ Read the y-coordinates of the nodes ~~~~~~~~~~~~~~~~~~
f.seek(4, 1)
self.meshy = np.asarray(
unpack(endian+str(self.npoin3)+ftype,
f.read(fsize*self.npoin3))[0:self.npoin2])
f.seek(4, 1)
[docs] def get_time_history_slf(self):
"""
Reads all result values
"""
f = self.file['hook']
endian = self.file['endian']
ftype, fsize = self.file['float']
ats = []
att = []
while True:
try:
att.append(f.tell())
# ~~ Read AT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.seek(4, 1)
ats.append(unpack(endian+ftype, f.read(fsize))[0])
f.seek(4, 1)
# ~~ Skip Values ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.seek(self.nvar*(4+fsize*self.npoin3+4), 1)
except error:
att.pop(len(att)-1) # since the last record failed the try
break
self.tags.update({'cores': att})
self.tags.update({'times': np.asarray(ats)})
[docs] def get_variables_at(self, frame, vars_indexes):
"""
Get values for a given time step and a list of variables
@param frame (int) Time step to extract
@param vars_indexes (list) List of variable indices
@return (np.array) array containg the values for each variable
"""
f = self.file['hook']
endian = self.file['endian']
ftype, fsize = self.file['float']
if fsize == 4:
z = np.zeros((len(vars_indexes), self.npoin3), dtype=np.float32)
else:
z = np.zeros((len(vars_indexes), self.npoin3), dtype=np.float64)
# if tags has 31 frames, len(tags)=31 from 0 to 30,
# then frame should be >= 0 and < len(tags)
if frame < len(self.tags['cores']) and frame >= 0:
f.seek(self.tags['cores'][frame])
f.seek(4+fsize+4, 1)
for ivar in range(self.nvar):
f.seek(4, 1)
if ivar in vars_indexes:
z[vars_indexes.index(ivar)] = \
unpack(endian+str(self.npoin3)+ftype,
f.read(fsize*self.npoin3))
else:
f.seek(fsize*self.npoin3, 1)
f.seek(4, 1)
return z
[docs] def alter_endian(self):
"""
Alter Endian for the file
"""
if self.fole['endian'] == ">":
self.fole['endian'] = "<"
else:
self.fole['endian'] = ">"
[docs] def alter_float(self):
"""
Alter the precision for float
"""
if self.fole['float'] == ('f', 4):
self.fole['float'] = ('d', 8)
else:
self.fole['float'] = ('f', 4)
[docs] def alter_values(self, vrs=None, m_z=1, p_z=0):
"""
Set alter values
"""
if vrs is not None:
self.alter_zm = m_z
self.alter_zp = p_z
self.alter_z_names = vrs.split(':')
[docs] def append_header_slf(self):
"""
Write the header part of the file
"""
f = self.fole['hook']
endian = self.fole['endian']
ftype, fsize = self.fole['float']
# ~~ Write title ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'i80si', 80, self.title.encode('utf8'), 80))
# ~~ Write nbv(1) and nbv(2) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'iiii', 4+4, self.nbv1, self.nbv2, 4+4))
# ~~ Write variable names and units ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for i in range(self.nbv1):
f.write(pack(endian+'i', 32))
f.write(pack(endian+'16s', self.varnames[i].encode('utf8')))
f.write(pack(endian+'16s', self.varunits[i].encode('utf8')))
f.write(pack(endian+'i', 32))
for i in range(self.nbv2):
f.write(pack(endian+'i', 32))
f.write(pack(endian+'16s', self.cldnames[i]))
f.write(pack(endian+'16s', self.cldunits[i]))
f.write(pack(endian+'i', 32))
# ~~ Write iparam array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'i', 4*10))
for i in range(len(self.iparam)):
f.write(pack(endian+'i', self.iparam[i]))
f.write(pack(endian+'i', 4*10))
# ~~ Write DATE/TIME array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if self.iparam[9] == 1:
f.write(pack(endian+'i', 4*6))
for i in range(6):
f.write(pack(endian+'i', self.datetime[i]))
f.write(pack(endian+'i', 4*6))
# ~~ Write nelem3, npoin3, ndp3, nplan ~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'6i', 4*4, self.nelem3, self.npoin3,
self.ndp3, 1, 4*4))
# ~~ Write the ikle array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'I', 4*self.nelem3*self.ndp3))
f.write(pack(endian+str(self.nelem3*self.ndp3)+'I',
*(self.ikle3.ravel()+1)))
f.write(pack(endian+'I', 4*self.nelem3*self.ndp3))
# ~~ Write the ipobo array ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'i', 4*self.npoin3))
f.write(pack(endian+str(self.npoin3)+'i', *(self.ipob3)))
f.write(pack(endian+'i', 4*self.npoin3))
# ~~ Write the x-coordinates of the nodes ~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'i', fsize*self.npoin3))
for i in range(self.nplan):
f.write(pack(endian+str(self.npoin2)+ftype, *(self.meshx)))
f.write(pack(endian+'i', fsize*self.npoin3))
# ~~ Write the y-coordinates of the nodes ~~~~~~~~~~~~~~~~~~~~~~~
f.write(pack(endian+'i', fsize*self.npoin3))
for i in range(self.nplan):
f.write(pack(endian+str(self.npoin2)+ftype, *(self.meshy)))
f.write(pack(endian+'i', fsize*self.npoin3))
[docs] def append_core_time_slf(self, time):
"""
Write time value
@param time (float) Time value
"""
f = self.fole['hook']
endian = self.fole['endian']
ftype, fsize = self.fole['float']
# Print time record
if isinstance(time, type(0.0)):
f.write(pack(endian+'i'+ftype+'i', fsize, time, fsize))
else:
f.write(pack(endian+'i'+ftype+'i', fsize, self.tags['times'][time],
fsize))
[docs] def append_core_vars_slf(self, varsor):
"""
Write variable informations
@param varsor (list) List of value for each variable
"""
f = self.fole['hook']
endian = self.fole['endian']
ftype, fsize = self.fole['float']
# Print variable records
for var in varsor:
f.write(pack(endian+'i', fsize*self.npoin3))
f.write(pack(endian+str(self.npoin3)+ftype, *(var)))
f.write(pack(endian+'i', fsize*self.npoin3))
[docs] def put_content(self, file_name, showbar=True):
"""
Write content of the object into a Serafin file
@param file_name (string) Name of the serafin file
@param showbar (boolean) If True displays a showbar
"""
self.fole.update({'name': file_name})
self.fole.update({'hook': open(file_name, 'wb')})
ibar = 0
if showbar:
pbar = ProgressBar(maxval=len(self.tags['times'])).start()
self.append_header_slf()
for time in range(len(self.tags['times'])):
ibar += 1
self.append_core_time_slf(time)
self.append_core_vars_slf(self.get_values(time))
if showbar:
pbar.update(ibar)
self.fole['hook'].close()
if showbar:
pbar.finish()
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Tool Box
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def get_values(self, time):
"""
Get values for all variables for a give time step
If alter_values were set it applies modification
"""
varsor = self.get_variables_at(time, self.varindex)
for alter_var in self.alter_z_names:
for var in range(len(self.varnames)):
if alter_var.lower() in self.varnames[var].lower():
varsor[var] = self.alter_zm * varsor[var] + self.alter_zp
for var in range(len(self.cldnames)):
if alter_var.lower() in self.cldnames[var].lower():
varsor[var+self.nbv1] = \
self.alter_zm * varsor[var+self.nbv1] +\
self.alter_zp
return varsor
[docs] def get_series(self, nodes, vars_indexes=None, showbar=True):
"""
Return the value for a list of nodes on variables given in vars_indexes
for each time step
@param nodes (list) list of nodes for which to extract data
@param vars_indexes (list) List of variables to extract data for
@param showbar (boolean) If True display a showbar for the progress
"""
f = self.file['hook']
endian = self.file['endian']
ftype, fsize = self.file['float']
if vars_indexes is None:
vars_indexes = self.varindex
# ~~ Ordering the nodes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# This assumes that nodes starts at 1
onodes = np.sort(np.array(list(zip(range(len(nodes)), nodes)),
dtype=[('0', int), ('1', int)]),
order='1')
# ~~ Extract time profiles ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if fsize == 4:
z = np.zeros((len(vars_indexes), len(nodes),
len(self.tags['cores'])),
dtype=np.float32)
else:
z = np.zeros((len(vars_indexes), len(nodes),
len(self.tags['cores'])),
dtype=np.float64)
f.seek(self.tags['cores'][0])
if showbar:
pbar = ProgressBar(maxval=len(self.tags['cores'])).start()
for time in range(len(self.tags['cores'])):
f.seek(self.tags['cores'][time])
f.seek(4+fsize+4, 1)
if showbar:
pbar.update(time)
for ivar in range(self.nvar):
f.seek(4, 1)
if ivar in vars_indexes:
jnod = onodes[0]
f.seek(fsize*(jnod[1]-1), 1)
z[vars_indexes.index(ivar), jnod[0], time] = \
unpack(endian+ftype, f.read(fsize))[0]
for inod in onodes[1:]:
f.seek(fsize*(inod[1]-jnod[1]-1), 1)
z[vars_indexes.index(ivar), inod[0], time] = \
unpack(endian+ftype, f.read(fsize))[0]
jnod = inod
f.seek(fsize*self.npoin3-fsize*jnod[1], 1)
else:
f.seek(fsize*self.npoin3, 1)
f.seek(4, 1)
if showbar:
pbar.finish()
return z
[docs] def set_kd_tree(self, reset=False):
"""
Builds a KDTree (impoves search of neighbors)
@param reset (boolean) Force reset of tree
"""
if reset or self.tree is None:
isoxy = np.column_stack((np.sum(self.meshx[self.ikle2],
axis=1)/3.0,
np.sum(self.meshy[self.ikle2],
axis=1)/3.0))
self.tree = cKDTree(isoxy)
[docs] def set_mpl_tri(self, reset=False):
"""
Build neighbours from matplotlib
@param reset (boolean) Force computing neighbours
"""
if reset or self.neighbours is None or self.edges is None:
# from matplotlib.tri import Triangulation
mpltri = Triangulation(self.meshx, self.meshy, self.ikle2)\
.get_cpp_triangulation()
self.neighbours = mpltri.get_neighbors()
self.edges = mpltri.get_edges()
def __del__(self):
"""
Deleting the object
"""
if self.file['name'] != '':
self.file['hook'].close()