Source code for fastwater.otm_gpl.files

r"""
    @note ... this work is based on a collaborative effort
    of the Telemac-Mascaret consortium

    @history 15/11/2011 -- Sebastien E. Bourban
            Addition of diff_text_files, a differential tool, which is
            called in the main.

    @history 15/11/2011 -- Sebastien E. Bourban
            Addition of move_file.

    @history 15/11/2011 -- Sebastien E. Bourban
            Addition of a progress bar to the put_file_content and
            add_file_content methods -- had to write by line, instead of just
            one.

    @history 04/06/2012 -- Fabien Decung
            Extension of get_these_files to include subdirectories of source/.

    @history 31/05/2012 -- Sebastien E. Bourban
            Addition of a simple unzipping method (unzip)

    @history 15/04/20124-- Sebastien E. Bourban
            function is_newer now processes files within a directory also.

    @brief
"""
from __future__ import print_function

# _____          ___________________________________________________
# ____/ Imports /__________________________________________________/
#
# ~~> dependencies towards standard python
import shutil
import time
import difflib
import codecs
from os import path, walk, mkdir, getcwd, chdir, remove, rmdir, listdir,\
                    stat, makedirs
from fastwater.otm_gpl.exceptions import TelemacException
try:
    from os import symlink
    SYMLINK_AVAIL = True
except ImportError:
    SYMLINK_AVAIL = False
import errno
from fnmatch import filter, fnmatch
import zipfile
from distutils.archive_util import make_archive
from distutils.dep_util import newer
from urllib.request import urlopen
from urllib.error import HTTPError

# ~~> dependencies towards other modules
from fastwater.otm_gpl.progressbar import ProgressBar

# _____                   __________________________________________
# ____/ Global Variables /_________________________________________/
#





# _____                  ___________________________________________
# ____/ General Toolbox /__________________________________________/
#

[docs]def recursive_glob(treeroot, pattern): """ Returns list of files matching pattern in all subdirectories of treeroot (to avoid usage of glob.glob with recursive argument which is not suppoted by Python >3.5) @param treeroot (str) Path of folder @param pattern (str) Pattern to search for """ results = [] for base, dirs, files in walk(treeroot): goodfiles = filter(files, pattern) results.extend(path.join(base, f) for f in goodfiles) return results
[docs]def add_to_list(lst, name, value): """ Add item to dictionary @param lst (dictionnary) list @param name (any) key @param value (any) adding value @return (dictionary) updating lst """ if lst.get(name) is not None: if value not in lst[name]: lst[name].append(value) else: lst.update({name: [value]}) return lst
[docs]def get_these_files(root, exts): """@brief Make a list of all files in root that have the extension in [ext] -- Return the list @param root (string) path root @param exts (string) extension file @return (list) files list """ root = root.strip() files = [] if path.exists(root): # @note FD@EDF : allow scan of subdirectories... # @note SEB@HRW : there must be aother way -- walk is too long dirpath, _, filenames = next(walk(root)) for fle in filenames: for ext in exts: _, tail = path.splitext(fle) if tail.lower() == ext.lower(): files.append(path.join(dirpath, fle)) return files
[docs]def is_newer(nfile, ofile): """ Evaluate whether one file is more recent than the other Return 1 is ofile exists and is more recent than nfile, 0 otherwise > newer(ofile,nfile) is True if ofile exists and is more recent than nfile @param nfile (string) new file @param ofile (string) old file or directory @return (integer) Return 1 is ofile exists and is more recent than nfile, 0 otherwise """ # TODO: use of newer_group for the second part of the test. i = 1 if path.isfile(ofile): if newer(ofile, nfile): i = 0 elif path.isdir(ofile): for fle in listdir(ofile): if path.isfile(path.join(ofile, fle)): if newer(path.join(ofile, fle), nfile): i *= 0 return 1 - min(i, 1)
[docs]def get_file_content(fle): """ Read fle file @param fle (string) file @return ilines (list) content line file """ ilines = [] src_file = codecs.open(fle, 'r', encoding='utf-8') for line in src_file: ilines.append(line) src_file.close() return ilines
[docs]def put_file_content(fle, lines): """ put line to file @param fle (string) file @param lines (string) adding line """ if path.exists(fle): remove(fle) src_file = open(fle, 'wb') if len(lines) > 0: ibar = 0 pbar = ProgressBar(maxval=len(lines)).start() src_file.write(bytes((lines[0].rstrip()).replace('\r', '')\ .replace('\n\n', '\n'), 'utf-8')) for line in lines[1:]: pbar.update(ibar) ibar += 1 src_file.write(bytes('\n'+(line.rstrip()).replace('\r', '')\ .replace('\n\n', '\n'), 'utf-8')) pbar.finish() src_file.close() return
[docs]def add_file_content(fle, lines): """ Add line to file @param fle (string) file @param lines (string) adding line """ src_file = open(fle, 'ab') ibar = 0 pbar = ProgressBar(maxval=len(lines)).start() for line in lines[0:]: ibar += 1 pbar.update(ibar) src_file.write(bytes('\n'+(line.rstrip()).replace('\r', '')\ .replace('\n\n', '\n'), 'utf-8')) pbar.finish() src_file.close() return
[docs]def create_directories(p_o): """ create directories tree @param p_o (string) directory """ p_r = p_o p_d = [] while not path.isdir(p_r): p_d.append(path.basename(p_r)) p_r = path.dirname(p_r) while p_d != []: p_r = path.join(p_r, p_d.pop()) mkdir(p_r) return
[docs]def copy_files(src, dest): """ Copy all the files within src @param src (string) source directory @param dest (string) target directory """ l_d = listdir(src) ibar = 0 pbar = ProgressBar(maxval=len(l_d)).start() for f in l_d: if path.isfile(path.join(src, f)): shutil.copy(path.join(src, f), dest) pbar.update(ibar) ibar += 1 pbar.finish()
[docs]def copy_file(src, dest): """ Copy one file to directory @param src (string) source file @param dest (string) target directory """ if path.exists(path.join(dest, path.basename(src))): remove(path.join(dest, path.basename(src))) if path.isfile(src): shutil.copy(src, dest)
[docs]def copy_file2file(src, dest): """ Copy one file to file @param src (string) source file @param dest (string) target file """ if path.exists(path.join(path.dirname(dest), path.basename(src))): remove(path.join(path.dirname(dest), path.basename(src))) if path.isfile(src): shutil.copy(src, dest)
[docs]def move_file(src, dest): """ Move file to directory @param src (string) source file @param dest (string) target directory """ if path.exists(path.join(dest, path.basename(src))): try: remove(path.join(dest, path.basename(src))) except BaseException: time.sleep(5) # /!\ addition for windows operating system try: remove(path.join(dest, path.basename(src))) except BaseException as excpt: raise TelemacException(\ 'I could not remove your existing file: '+src) if path.exists(src): try: shutil.move(src, dest) except BaseException: time.sleep(5) # /!\ addition for windows operating system try: shutil.move(src, dest) except BaseException as excpt: raise TelemacException(\ 'I could not move your file: ' + src + '\n ... maybe the detination exists?')
[docs]def move_file2file(src, dest): """ Move file to file @param src (string) source file @param dest (string) target file """ if dest == src: return if path.exists(dest): try: remove(dest) except BaseException: time.sleep(5) # /!\ addition for windows operating system try: remove(dest) except BaseException as excpt: raise TelemacException( 'I could not remove your existing file: '+dest) if path.exists(src): try: shutil.move(src, dest) except BaseException: time.sleep(5) # /!\ addition for windows operating system try: shutil.move(src, dest) except BaseException as excpt: raise TelemacException( 'I could not move your file: ' + src + '\n ... maybe the detination exists?')
[docs]def remove_directories(root): """ Walk through the directory structure available from the root and removes everything in it, including the root @param root (string) directory structure to remove """ for path_dir, pdirs, pfiles in walk(root, topdown=False): for fle in pfiles: remove(path.join(path_dir, fle)) for idir in pdirs: try: rmdir(path.join(path_dir, idir)) except BaseException: time.sleep(5) # /!\ addition for windows operating system rmdir(path.join(path_dir, idir)) try: rmdir(root) except BaseException: time.sleep(5) # /!\ addition for windows operating system rmdir(root) return
[docs]def check_safe(src, safe, c_k): """ Check @param src (string) file to check @param safe (string) directory to check @param c_k (integer) ???? todo @return (boolean) True if file exists """ dest = path.join(safe, path.basename(src)) if not path.exists(dest): return True if is_newer(src, dest) == 1 and c_k < 2: return False remove(dest) return True
[docs]def match_safe(src, ex, safe, c_k): """ ???????? todo @param src (string) file to check @param ex (string) pattern file to check @param safe (string) directory to check @param c_k (integer) ???? todo @return (boolean) True if file exists """ # ~~> list all entries dir_p, _, filenames = next(walk(safe)) if filenames == []: return True # ~~> match expression exnames = [] for dest in filenames: if fnmatch(dest, ex): if c_k > 1: # TODO: try except if file access not granted try: remove(path.join(dir_p, dest)) except BaseException: time.sleep(5) # /!\ addition for windows operating system try: remove(path.join(dir_p, dest)) except Exception as excpt: raise TelemacException( 'I could not remove your existing file: ' + path.join(dir_p, dest)) continue exnames.append(path.join(dir_p, dest)) if exnames == []: return True # ~~> check if newer files found = False for dest in exnames: if is_newer(src, dest) == 0: found = True if not found: return False # ~~> remove all if one older for dest in exnames: remove(dest) return True
# _____ ___________________________________________ # ____/ Archive Toolbox /__________________________________________/ #
[docs]def tel_zip(zname, bname, form): """ bname is a the root directory to be archived -- Return the name of the archive, zname, with its full path -- form can be either 'zip', 'gztar' ... read from the config file @param zname (string) archive name @param bname (string) file or directory to archive @param form (string) archive format @return zipfile (string) name of the archive, zname, with its full path """ cpath = getcwd() chdir(path.dirname(bname)) zip_file = make_archive(zname, form, base_dir=path.basename(bname)) chdir(cpath) return zip_file
[docs]def zipsortie(sortie): """ zip files and remove virtually all of them ! @param sortie (string) output to archive @return zname (string) name of the archive """ head, tail = path.splitext(path.basename(sortie)) zname = head + '.zip' if path.dirname(sortie) != '': for dirname, _, filenames in walk(path.dirname(sortie)): break else: for dirname, _, filenames in walk('.'): break cpath = getcwd() chdir(dirname) z = zipfile.ZipFile(zname, 'a', compression=zipfile.ZIP_DEFLATED, allowZip64=True) for filename in filenames: if head == filename[:len(head)] and tail == path.splitext(filename)[1]: z.write(filename) if filename != path.basename(sortie): remove(filename) chdir(cpath) return zname
[docs]def unzip(zip_name, bname): """ bname is a the root directory where the archive is to be extracted -- @param zip_name (string) archive file @param bname (string) target directory """ z = zipfile.ZipFile(path.realpath(zip_name), 'r') cpath = getcwd() chdir(bname) for f in z.namelist(): if f.endswith('/'): if not path.exists(f): makedirs(f) else: z.extract(f) chdir(cpath)
# _____ ______________________________________________ # ____/ Diff Toolbox /_____________________________________________/ #
[docs]def diff_text_files(f_file, t_file, options): """ Command line interface to provide diffs in four formats: * ndiff: lists every line and highlights interline changes. * context: highlights clusters of changes in a before/after format. * unified: highlights clusters of changes in an inline format. * html: generates side by side comparison with change highlights. @param f_file (string) @param t_file (string) @param options (string) @return (str) """ # we're passing these as arguments to the diff function f_date = time.ctime(stat(f_file).st_mtime) t_date = time.ctime(stat(t_file).st_mtime) f_lines = get_file_content(f_file) t_lines = get_file_content(t_file) if options.unified: return difflib.unified_diff(f_lines, t_lines, f_file, t_file, f_date, t_date, n=options.ablines) if options.ndiff: return difflib.ndiff(f_lines, t_lines) if options.html: return difflib.HtmlDiff().make_file(f_lines, t_lines, f_file, t_file, context=options.context, numlines=options.ablines) return difflib.context_diff(f_lines, t_lines, f_file, t_file, f_date, t_date, n=options.ablines)
# _____ _______________________________________ # ____/ Internet connection /______________________________________/ #
[docs]def is_online(url='http://www.opentelemac.org/', timeout=5): """ Check url @param url (string) url @param timeout (integer) timeout @return (bool) """ try: urlopen(url, timeout=timeout) return True except HTTPError: print("... could not connect through to the internet.") return False
# _____ ________________________________________________ # ____/ MAIN CALL /_______________________________________________/ # __author__ = "Sebastien Bourban" __date__ = "$19-Jul-2010 08:51:29$"