r"""@author Christopher J. Cawthorn and Sebastien E. Bourban
@brief Tools for handling SELAFIN files and
TELEMAC binary related in python
@details Contains read/write functions for
binary (big-endian) SELAFIN files
"""
from __future__ import print_function
# _____ ___________________________________________________
# ____/ Imports /__________________________________________________/
#
# ~~> dependencies towards standard python
from struct import unpack
import numpy as np
# ~~> dependencies towards other modules
# ~~> dependencies towards other pytel/modules
from fastwater.otm_gpl.progressbar import ProgressBar
from fastwater.otm_gpl.exceptions import TelemacException
# _____ __________________________________________
# ____/ Global Variables /_________________________________________/
#
# _____ ___________________________________________
# ____/ General Toolbox /__________________________________________/
#
[docs]def subset_variables_slf(vrs, all_vars):
"""
Take a string in the format "var1:object;var2:object;var3;var4" and returns
two list one of index and one of values of all the variables in all_vars
that match var.
@param vrs (string) String contain ; separated var:object values
@param all_vars (list) List of the variables to match with
@return (list) list of index of the matching variables
@return (list) list of names of the matching variables
"""
ids = []
names = []
# vrs has the form "var1:object;var2:object;var3;var4"
# /!\ the ; separator might be a problem for command line action
variable = vrs.replace(',', ';').split(';')
for var in variable:
var_name = var.split(':')[0]
# Loop on variables in file
for jvar in range(len(all_vars)):
# Filling varname with spaces
full_var_name = all_vars[jvar].lower() + \
" "*(16-len(all_vars[jvar]))
# if varnme is in the variable name adding it
if var_name.lower() in full_var_name:
ids.append(jvar)
names.append(all_vars[jvar].strip())
if len(ids) < len(variable):
raise TelemacException(
"... Could not find {} in {}"
" +> may be you forgot to switch name spaces into "
"underscores in your command ?"
"".format(variable, str(all_vars)))
return ids, names
[docs]def get_value_history(slf, times, support, vrs):
"""
Extraction of time series at points.
A point could be:
(a) A point could be a node 2D associated with one or more plan number
(b) A pair (x,y) associated with one or more plan number
Warning: Vertical interpolation has not been implemented yet.
@param slf (TelemacFile) Serafin file structure
@param times (list) the discrete list of time frame
to extract from the time history
@param support (list) the list of points
@param vrs (list) the index in the nvar-list to the variable to
extract
"""
(vars_indexes, var_names) = vrs
# ~~ Total size of support ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
lens = 0
for _, zep in support:
lens += len(zep)
# ~~ Extract time profiles ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
z = np.zeros((len(vars_indexes), lens, len(times)), dtype=np.float64)
for itime, time in enumerate(times): # it is from 0 to len(time)-1
for jvar, var_name in enumerate(var_names): # ivar is from 0 to nvar-1
varsor = slf.get_data_value(var_name, time)
# ipt is from 0 to lens-1
# (= all points extracted and all plans extracted)
ipt = 0
for x_y, zep in support:
# xp is a pair (x,y) and you need interpolation
if isinstance(x_y, tuple):
# /!\ only list of plans is allowed for now
for plan in zep:
z[jvar][ipt][itime] = 0.
l_n, b_n = x_y
for inod in range(len(b_n)):
ipoin = l_n[inod]+plan*slf.npoin3//slf.nplan
z[jvar][ipt][itime] += b_n[inod]*varsor[ipoin]
ipt += 1 # ipt advances to keep on track
else:
# /!\ only list of plans is allowed for now
for plan in zep:
z[jvar][ipt][itime] = \
varsor[x_y+plan*slf.npoin3//slf.nplan]
ipt += 1 # ipt advances to keep on track
return z
[docs]def get_value_history_slf(hook, tags, time, support, nvar, npoin3, nplan,
t1):
r"""
Extraction of time series at points.
A point could be:
(a) A point could be a node 2D associated with one or more plan number
(b) A pair (x,y) associated with one or more plan number
/!\ Vertical interpolation has not been implemented yet.
Arguments:
- time: the discrete list of time frame to extract
from the time history
- support: the list of points
- vars_indexes: the index in the nvar-list to the variable to extract
"""
(vars_indexes, _) = t1
f = hook['hook']
endian = hook['endian']
ftype, fsize = hook['float']
# ~~ Total size of support ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
lens = 0
for _, zep in support:
lens += len(zep)
# ~~ Extract time profiles ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
z = np.zeros((len(vars_indexes), lens, len(time)))
if fsize == 4:
z = np.zeros((len(vars_indexes), lens, len(time)), dtype=np.float32)
else:
z = np.zeros((len(vars_indexes), lens, len(time)), dtype=np.float64)
for itime in range(len(time)): # it is from 0 to len(time)-1
# time[itime] is the frame to be extracted
f.seek(tags['cores'][time[itime]])
f.seek(4+fsize+4, 1) # the file pointer is initialised
for ivar in range(nvar): # ivar is from 0 to nvar-1
# the file pointer advances through all records to keep on track
f.seek(4, 1)
if ivar in vars_indexes: # extraction of a particular variable
varsor = unpack(endian+str(npoin3)+ftype, f.read(fsize*npoin3))
jvar = vars_indexes.index(ivar)
# ipt is from 0 to lens-1
# (= all points extracted and all plans extracted)
ipt = 0
for x_y, zep in support:
# xp is a pair (x,y) and you need interpolation
t2 = type(())
if isinstance(x_y, t2):
# /!\ only list of plans is allowed for now
for plan in zep:
z[jvar][ipt][itime] = 0.
l_n, b_n = x_y
for inod in range(len(b_n)):
z[jvar][ipt][itime] += \
b_n[inod] *\
varsor[l_n[inod]+plan*npoin3//nplan]
ipt += 1 # ipt advances to keep on track
else:
# /!\ only list of plans is allowed for now
for plan in zep:
z[jvar][ipt][itime] = \
varsor[x_y+plan*npoin3//nplan]
ipt += 1 # ipt advances to keep on track
else:
# the file pointer advances through all
# records to keep on track
f.seek(fsize*npoin3, 1)
f.seek(4, 1)
return z
[docs]def get_edges_slf(ikle, meshx, meshy, showbar=True):
"""
Returns the list of edges of the mesh
@param ikle (np.array) Connectivity table
@param meshx (np.array) X coordinates of the mesh points
@param meshy (np.array) Y coordinates of the mesh points
@param showbar (boolean) If True display a progress bar
@returns (list) The list of edges
"""
try:
from matplotlib.tri import Triangulation
edges = Triangulation(meshx, meshy, ikle).get_cpp_triangulation()\
.get_edges()
except ImportError:
edges = []
ibar = 0
if showbar:
pbar = ProgressBar(maxval=len(ikle)).start()
for elem in ikle:
ibar += 1
if showbar:
pbar.update(ibar)
if [elem[0], elem[1]] not in edges:
edges.append([elem[1], elem[0]])
if [elem[1], elem[2]] not in edges:
edges.append([elem[2], elem[1]])
if [elem[2], elem[0]] not in edges:
edges.append([elem[0], elem[2]])
if showbar:
pbar.finish()
return edges
[docs]def get_neighbours_slf(ikle, meshx, meshy, showbar=True):
"""
Return a list containing for each element the list of elements that are
neighbours to that element
@param ikle (np.array) Connectivity table
@param meshx (np.array) X coordinates of the mesh points
@param meshy (np.array) Y coordinates of the mesh points
@param showbar (boolean) If True display a progress bar
@returns (list) The list of neighbours
"""
try:
from matplotlib.tri import Triangulation
neighbours = Triangulation(meshx, meshy, ikle).get_cpp_triangulation()\
.get_neighbors()
except ImportError:
insiders = {}
bounders = {}
ibar = 0
if showbar:
pbar = ProgressBar(maxval=(3*len(ikle))).start()
for elem, i in zip(ikle, range(len(ikle))):
n_k = bounders.keys()
for k in [0, 1, 2]:
ibar += 1
if showbar:
pbar.update(ibar)
if (elem[k], elem[(k+1) % 3]) not in n_k:
bounders.update({(elem[(k+1) % 3], elem[k]): i})
else:
j = bounders[(elem[k], elem[(k+1) % 3])]
insiders.update({(elem[k], elem[(k+1) % 3]): [i, j]})
del bounders[(elem[k], elem[(k+1) % 3])]
ibar = 0
neighbours = - np.ones((len(ikle), 3), dtype=np.int)
for elem, i in zip(ikle, range(len(ikle))):
for k in [0, 1, 2]:
ibar += 1
if showbar:
pbar.update(ibar)
if (elem[k], elem[(k+1) % 3]) in insiders:
elem_a, elem_b = insiders[(elem[k], elem[(k+1) % 3])]
if elem_a == i:
neighbours[i][k] = elem_b
if elem_b == i:
neighbours[i][k] = elem_a
if (elem[(k+1) % 3], elem[k]) in insiders:
elem_a, elem_b = insiders[(elem[(k+1) % 3], elem[k])]
if elem_a == i:
neighbours[i][k] = elem_b
if elem_b == i:
neighbours[i][k] = elem_a
if showbar:
pbar.finish()
return neighbours
[docs]def get_value_polyline(slf, times, support, vrs):
"""
Extraction of longitudinal profiles along lines.
A line is made of points extracted from slice_mesh:
A point is a pair (x,y) associated with one or more plan number
Warning: Vertical interpolation has not been implemented yet.
@param slf (TelemacFile) Telemac file class
@param times (list) the discrete list of time frame to extract
from the time
history
@param support (list): the list of points intersecting th mesh
@param vrs (tuple of list): the index, names in the
nvar-list to the variable to extract
"""
(vars_indexes, var_names) = vrs
# ~~ Total size of support ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
lens = len(support[0][1])
# ~~ Extract time profiles ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
z = np.zeros((len(vars_indexes), len(times), lens, len(support)),
dtype=np.float64)
# ~~ Extract data along line ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for itime, time in enumerate(times): # it is from 0 to len(time)-1
# time[itime] is the frame to be extracted
for jvar, var_name in enumerate(var_names): # ivar is from 0 to nvar-1
# the file pointer advances through all records to keep on track
varsor = slf.get_data_value(var_name, time)
# ipt is from 0 to lens-1
# (= all points extracted and all plans extracted)
for ipt in range(len(support)):
x_y, zep = support[ipt]
# /!\ only list of plans is allowed for now
for ipl in range(len(zep)):
z[jvar][itime][ipl][ipt] = 0.
l_n, b_n = x_y
for inod in range(len(b_n)):
ipoin = l_n[inod]+zep[ipl]*slf.npoin3//slf.nplan
z[jvar][itime][ipl][ipt] += b_n[inod]*varsor[ipoin]
return z
[docs]def get_value_polyline_slf(hook, tags, time, support, nvar, npoin3, nplan,
t1):
r"""
Extraction of longitudinal profiles along lines.
A line is made of points extracted from slice_mesh:
A point is a pair (x,y) associated with one or more plan number
/!\ Vertical interpolation has not been implemented yet.
Arguments:
- time: the discrete list of time frame to extract
from the time history
- support: the list of points intersecting th mesh
- vars_indexes: the index in the nvar-list to the variable to extract
"""
(vars_indexes, _) = t1
f = hook['hook']
endian = hook['endian']
ftype, fsize = hook['float']
# ~~ Total size of support ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
lens = len(support[0][1])
# ~~ Extract time profiles ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if fsize == 4:
z = np.zeros((len(vars_indexes), len(time), lens, len(support)),
dtype=np.float32)
else:
z = np.zeros((len(vars_indexes), len(time), lens, len(support)),
dtype=np.float64)
# ~~ Extract data along line ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for itime in range(len(time)): # it is from 0 to len(time)-1
# time[itime] is the frame to be extracted
f.seek(tags['cores'][time[itime]])
f.read(4+fsize+4) # the file pointer is initialised
for ivar in range(nvar): # ivar is from 0 to nvar-1
# the file pointer advances through all records to keep on track
f.read(4)
if ivar in vars_indexes: # extraction of a particular variable
varsor = unpack(endian+str(npoin3)+ftype, f.read(fsize*npoin3))
# ipt is from 0 to lens-1
# (= all points extracted and all plans extracted)
for ipt in range(len(support)):
x_y, zep = support[ipt]
# /!\ only list of plans is allowed for now
for ipl in range(len(zep)):
z[vars_indexes.index(ivar)][itime][ipl][ipt] = 0.
l_n, b_n = x_y
for inod in range(len(b_n)):
z[vars_indexes.index(ivar)][itime][ipl][ipt] += \
b_n[inod] *\
varsor[l_n[inod]+zep[ipl]*npoin3//nplan]
else:
# the file pointer advances through
# all records to keep on track
f.read(fsize*npoin3)
f.read(4)
return z
[docs]def get_value_polyplan(slf, times, support, vrs):
"""
Extraction of variables at a list of times on a list of planes.
A plane is an integer
Warning: Vertical interpolation has not been implemented yet.
Arguments:
- time: the discrete list of time frame to extract from the time history
- support: the list of planes
- vars_indexes: the index in the nvar-list to the variable to extract
"""
(vars_indexes, var_names) = vrs
# ~~ Extract time profiles ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
z = np.zeros((len(vars_indexes), len(times), len(support),
slf.npoin3//slf.nplan),
dtype=np.float64)
# ~~ Extract data along line ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for itime, time in enumerate(times): # it is from 0 to len(time)-1
# time[itime] is the frame to be extracted
for jvar, var_name in enumerate(var_names): # ivar is from 0 to nvar-1
# the file pointer advances through all records to keep on track
varsor = slf.get_data_value(var_name, time)
# ipt is from 0 to lens-1
# (= all points extracted and all plans extracted)
for ipl in range(len(support)):
z[jvar][itime][ipl] = \
varsor[support[ipl]*slf.npoin3//slf.nplan:
(support[ipl]+1)*slf.npoin3//slf.nplan]
return z
[docs]def get_value_polyplan_slf(hook, tags, time, support, nvar, npoin3, nplan,
t1):
r"""
Extraction of variables at a list of times on a list of planes.
A plane is an integer
/!\ Vertical interpolation has not been implemented yet.
Arguments:
- time: the discrete list of time frame to extract
from the time history
- support: the list of planes
- vars_indexes: the index in the nvar-list to the variable to extract
"""
(vars_indexes, _) = t1
f = hook['hook']
endian = hook['endian']
ftype, fsize = hook['float']
# ~~ Extract planes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
if fsize == 4:
z = np.zeros((len(vars_indexes), len(time),
len(support), npoin3//nplan),
dtype=np.float32)
else:
z = np.zeros((len(vars_indexes), len(time),
len(support), npoin3//nplan),
dtype=np.float64)
# ~~ Extract data on several planes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
for itime in range(len(time)): # it is from 0 to len(time)-1
# time[itime] is the frame to be extracted
f.seek(tags['cores'][time[itime]])
f.read(4+fsize+4) # the file pointer is initialised
for ivar in range(nvar): # ivar is from 0 to nvar-1
# the file pointer advances through all records to keep on track
f.read(4)
if ivar in vars_indexes: # extraction of a particular variable
varsor = unpack(endian+str(npoin3)+ftype, f.read(fsize*npoin3))
# ipt is from 0 to len(support) (= all plans extracted)
for ipl in range(len(support)):
z[vars_indexes.index(ivar)][itime][ipl] = \
varsor[support[ipl]*npoin3//nplan:
(support[ipl]+1)*npoin3//nplan]
else:
# the file pointer advances through all
# records to keep on track
f.read(fsize*npoin3)
f.read(4)
return z
[docs]def get_endian_from_char(f, nchar):
"""
Returns endianess of the file by trying to read a value in the file and
comparing it to nchar
@param f (file descriptor) File descriptor
@param nchar (string) String to compare to
@returns (string) String for endianess ("<" for little ">" for big)
"""
pointer = f.tell()
endian = ">" # "<" means little-endian, ">" means big-endian
ll, _, chk = unpack(endian+'i'+str(nchar)+'si', f.read(4+nchar+4))
if chk != nchar:
endian = "<"
f.seek(pointer)
ll, _, chk = unpack(endian+'i'+str(nchar)+'si', f.read(4+nchar+4))
if ll != chk:
raise TelemacException(
'... Cannot read {} characters from your binary file'
' +> Maybe it is the wrong file format ?'
''.format(str(nchar)))
f.seek(pointer)
return endian
[docs]def get_float_type_from_float(f, endian, nfloat):
"""
Identifies float precision from the file (single or double)
@param f (file descriptor) File descriptor
@param endian (string) Endianess type ("<" for little ">" for big)
@param nfloat (float) Float to compare to
@return (string, integer) Returns the string to be used for readinf ans the
number of byte on which the float is encoded ('f', 4) for single ('d',8)
for double precision
"""
pointer = f.tell()
ifloat = 4
cfloat = 'f'
ll = unpack(endian+'i', f.read(4))
if ll[0] != ifloat*nfloat:
ifloat = 8
cfloat = 'd'
_ = unpack(endian+str(nfloat)+cfloat, f.read(ifloat*nfloat))
chk = unpack(endian+'i', f.read(4))
if ll != chk:
raise TelemacException(
'... Cannot read {} floats from your binary file'
' +> Maybe it is the wrong file format ?'
''.format(str(nfloat)))
f.seek(pointer)
return cfloat, ifloat
# _____ ________________________________________________
# ____/ MAIN CALL /_______________________________________________/
#
__author__ = "Sebastien E. Bourban"
__date__ = "$09-Sep-2011 08:51:29$"