#!/usr/bin/env python3
import itertools
import os
import warnings
[docs]class OutputIteratorSkipWarning(UserWarning):
"""
A UserWarning child class raised when the advancing iterator skips a
time between zero and the stop time.
"""
[docs] def get_message(next_time, prev_time):
return "".join(
[
f"Next output time {next_time} is less than or equal to the ",
f"previous output time {prev_time}. Skipping...",
]
)
[docs]class GenericOutputWriter:
r"""Base class for all new style output writers or converted old style
output writers.
The derived class defines when output occurs via an iterator and what is
actually produced. This base class handles the interfacing with the model
loop.
At minimum, derived classes must define **run_one_step** for generating the
actual output and must provide an iterator of output times via either the
constructor or **register_times_iter**. Calling
**register_output_filepath** from the derived class allows for some
optional file management features.
See constructor for more details.
"""
# Generate unique output writer ID numbers
_id_iter = itertools.count()
[docs] def __init__(
self,
model,
name=None,
add_id=True,
save_first_timestep=False,
save_last_timestep=True,
output_dir=None,
times_iter=None,
verbose=False,
):
r"""Base class for all new style output writers.
Parameters
----------
model : terrainbento ErosionModel instance
name : string, optional
The name of the output writer used for identifying the writer and
generating file names. Defaults to "output-writer" or
"output-writer-id{id}" depending on **add_id** argument.
add_id : bool, optional
Indicates whether the output writer ID number should be appended to
the name using the format "-id{id}". Useful if there are
multiple output writers of the same type with non-unique names.
Defaults to True.
save_first_timestep : bool, optional
Indicates that the first output time must be at time zero
regardless of whether or not the output time iterator generates
zero. Defaults to False.
save_last_timestep : bool, optional
Indicates that the last output time must be at the clock stop time
regardless of whether or not the output time iterator would
normally generate the stop time. Defaults to True.
output_dir : string, optional
Directory where output files will be saved. Default value is None,
which creates an 'output' directory in the current directory.
times_iter : iterator of floats, optional
The user can provide an iterator of floats representing output
times here instead of registering one later using
**register_times_iter**. The user must ensure that the times
implied by `times_iter` align with the model timesteps used by the
Clock. If a timestep is skipped a warning is raised and if more
than five timesteps are skipped an error is raised.
Returns
-------
GenericOutputWriter: object
Examples
--------
GenericOutputWriter is a base class that should not be run by itself.
Please see the terrainbento tutorial for output examples.
"""
self._model = model
self._save_first_timestep = save_first_timestep
self._save_last_timestep = save_last_timestep
self._verbose = verbose
# Make sure the model has a clock. All models should have clock, but
# just in case...
assert (
hasattr(self.model, "clock") and self.model.clock is not None
), "Output writers require that the model has a clock."
# Generate the id number for this instance
self._id = next(GenericOutputWriter._id_iter)
# Generate a default name if necessary
self._name = name or "output-writer"
self._name += f"-id{self._id}" if add_id else ""
# Generate an iterator of output times
# Needs to be set by register_times_iter
self._times_iter = None
# Some variables to track the state of the iterator
self._next_output_time = None
self._prev_output_time = None
self._is_exhausted = False
# File management
if output_dir is None:
# Make a subdir with a some kind of model run identifier?
# e.g. time stamp for model start time
output_dir = os.path.join(os.curdir, "output")
if not os.path.isdir(output_dir): # pragma: no cover
self.vprint("Making output directory at {output_dir}")
os.mkdir(output_dir)
self._output_dir = output_dir
self._output_filepaths = []
# Register the times_iter if one was provided.
if times_iter is not None:
self.register_times_iter(times_iter)
# Attributes
@property
def model(self):
""" The model reference. """
return self._model
@property
def id(self):
""" The output writer's unique id number. """
return self._id
@property
def name(self):
""" The output writer's name. """
return self._name
@property
def filename_prefix(self):
"""Generate a filename prefix based on the model prefix, writer's
name, and model time. e.g. model-prefix_ow-name_time-0000000001.0"""
# Note, model iteration is NOT the number of steps... It is the number
# of times the run_for loop is executed. Can't use it here even though
# an integer would be cleaner.
model_prefix = self.model.output_prefix
# iteration_str = f"iter-{self.model.iteration:05d}"
time_str = f"time-{self.model.model_time:012.1f}" # .replace('.', 'x')
if model_prefix:
# prefix = '_'.join([model_prefix, self._name, iteration_str])
prefix = "_".join([model_prefix, self._name, time_str])
else:
# prefix = '_'.join([self._name, iteration_str])
prefix = "_".join([self._name, time_str])
return prefix
@property
def output_dir(self):
""" Output directory """
return self._output_dir
@property
def next_output_time(self):
r"""Return when this object is next supposed to write output. Does NOT
advance the iterator."""
return self._next_output_time
@property
def prev_output_time(self):
r"""Returns the previous valid output time. Does not change after the
time iterator is exhausted."""
return self._prev_output_time
@property
def output_filepaths(self):
"""Return a list of all output filepaths that have been written by
this writer and registered with **register_output_filepath**."""
return self._output_filepaths
# Time iterator methods
[docs] def register_times_iter(self, times_iter):
"""Function for registering an iterator of output times.
The inheriting class must call this function or provide the iterator to
the constructor (which then calls this function). This function does
not check the values in the iterator, but **advance_iter** will.
Parameters
----------
times_iter : iterator of floats
An iterator of floats representing model times when the output
writer should create output. The iterator values should be
monotonically increasing and non-negative, but there is some
flexibility in **advance_iter** to skip bad values.
"""
self._times_iter = times_iter
[docs] def advance_iter(self):
r"""Public-facing function for advancing the output times iterator.
The advancing iterator accounts for forced saving on the first/last
steps and accounts for short sequences where the generated times are
smaller than the previous value.
Warnings are thrown when a time between zero and the stop time is
skipped and a RecursionError is thrown if too many values are skipped
(default is 5 skips max).
Returns
-------
next_output_time : float or None
A float value for the next model time when this output writer needs
to write output. None indicates that this writer has finished
writing output for the rest of the model run.
"""
# Assert that the iterator exists and has the next function
assert (
self._times_iter is not None
), "An output time iterator has not been registered!."
assert hasattr(
self._times_iter, "__next__"
), "The output time iterator needs a __next__ function."
# Check if the writer is already in an exhausted state
if self._is_exhausted:
# Already exhausted. Always return None
assert self._next_output_time is None
return None
# Writer is not exhausted yet
had_next = self._next_output_time is not None
had_prev = self._prev_output_time is not None
save_first = self._save_first_timestep
model_stop_time = self.model.clock.stop
# Update the previous value before advancing the iterator
if had_next:
# Only updates the previous time while the iterator is running.
# (eventually becomes the final valid output time)
assert self._next_output_time <= model_stop_time
self._prev_output_time = self._next_output_time
# Check if the last output time was the stop time.
if had_next and self._next_output_time == model_stop_time:
# Previous time was the final step and output was forced by
# _save_last_timestep. The times iterator was still advanced during
# the last step and might be returning garbage if used again.
# e.g. [1,2,3,40,15] with stop time of 20 and save_last_step = True
# might attempt to write output at t=15.
next_time = None
elif save_first and not had_prev and not had_next:
# First time advancing the iterator (both prev and next are None),
# but the first output time needs to be at time zero. Set the next
# time to zero.
next_time = 0.0
else:
# Advance the iterator
next_time = self._advance_iter_recursive()
# Check if the writer has become exhausted
if next_time is None:
self._is_exhausted = True
# Save and return the next time
self._next_output_time = next_time
return next_time
def _advance_iter_recursive(self, recursion_counter=5):
r"""Private function for advancing the output times iterator.
This function accounts for iterator exhaustion, saving the last time
step, and values that are smaller than the previous value.
Recursion is used to skip times that are too small compared to the
previous output time. Warnings are thrown whenever a time between zero
and the stop time is skipped and a RecursionError is thrown if too many
values are skipped (default is 5 skips in a row).
Parameters
----------
recursion_counter : int, optional
A counter to track the depth of recursion when skipping values less
than or equal to the previous value. Defaults to a max depth of 5.
Returns
-------
next_output_time : float or None
A float value for the next model time when this output writer needs
to write output. None indicates that this writer has finished
writing output for the rest of the model run.
"""
# Advance the time iterator to get the next time value
next_time = next(self._times_iter, None)
prev_time = self._prev_output_time # Already updated by advance_iter()
model_stop_time = self.model.clock.stop
if next_time is None:
# The iterator returned None and is therefore exhausted.
if self._save_last_timestep:
# Make sure the last output time will be at the end of the
# model run.
if prev_time is None or prev_time < model_stop_time:
# The iterator either had no entries or the previous output
# time is before model stop time. Either way, make sure the
# next output time is the model stop time.
return model_stop_time
# else prev_time >= stop_time -> already output at stop time
# Output at the model stop time was not required or already
# occurred. No further times necessary.
return None
# For the following code, we know next_time is not None
# Check that the iterator returned a proper value
assert isinstance(
next_time, float
), "The output time iterator needs to generate float values."
if next_time > model_stop_time:
# The next time is greater than the model end time and should be
# exhausted. The iterator is too long (most likely infinite) and
# the interval either jumped over the model stop time or this is
# the final time step.
if self._save_last_timestep:
# Make sure the last output time will be at the end of the
# model run.
if prev_time is None or prev_time < model_stop_time:
# The iterator jumped past the end time from either the
# first advance (i.e. output interval > model duration) or
# from a normal advance. Either way, make sure the next
# output time is the model stop time.
return model_stop_time
# else prev_time >= stop_time -> already output at stop time
# Output at the model stop time was not required or already
# occurred. No further times necessary.
return None
elif (prev_time is not None) and (prev_time >= next_time):
# Next time is smaller than previous time. Ignore this value and
# try advancing again until a larger value is found or the
# recursion_counter runs out.
if recursion_counter > 0:
if not (prev_time == 0 and next_time == 0):
# Warn the user that there are issues with the iterator.
# Ignore when time == zero because that may be common when
# trying to save the first time step.
warning_cls = OutputIteratorSkipWarning
warning_msg = warning_cls.get_message(next_time, prev_time)
warnings.warn(warning_msg, warning_cls)
return self._advance_iter_recursive(recursion_counter - 1)
else:
raise RecursionError("Too many output times skipped.")
else:
# Normal value. Return as is.
return next_time
# Methods to override
[docs] def run_one_step(self):
r""" The function which actually writes data to files or the screen. """
raise NotImplementedError(
"The inheriting class needs to implement this function."
)
# File management
[docs] def make_filepath(self, filename):
""" Join the output directory to a filename. """
return os.path.join(self.output_dir, filename)
[docs] def is_file_registered(self, filepath):
"""Check if an output filepath has already been registered with
this writer.
Parameters
----------
filepath : string
Filepath to check.
Returns
-------
is_registered : bool
True means that the file is already registered. False means file is
not registered yet.
"""
return filepath in self._output_filepaths
[docs] def register_output_filepath(self, filepath):
"""Register the filepath to a newly created file.
Does not throw any errors or warnings if the file is already registered
or exists. (Should it? User could be intentionally overwriting a file.)
NOTE: Old style output writers do not have the ability to register
files. Therefore file registering/management can't be a required
feature.
Parameters
----------
filepath : string
Filepath to a new file that will be registered.
"""
if not self.is_file_registered(filepath):
self.vprint(f"Registering a new filepath {filepath}")
self._output_filepaths.append(filepath)
[docs] def delete_output_files(self, only_extension=None):
"""Delete output files generated by this writer that have been
registered. Primarily for testing cleanup.
Parameters
----------
only_extension : string, optional
Specify what type of files to delete. Defaults to None, which will
delete all file types generated by this writer that have been
registered.
"""
output_filepaths = self._output_filepaths
keep_filepaths = []
self.vprint("Deleting files...")
self.vprint(f"{self.name} wrote: {output_filepaths}")
for filepath in output_filepaths:
# Note: ''[1:] will return '' (i.e. does not crash if no extension)
file_ext = os.path.splitext(filepath)[1][1:]
if only_extension is None or file_ext in only_extension:
# Deleting all files or just the target extension type
self.vprint(f"Deleting {filepath}")
try:
os.remove(filepath)
except WindowsError: # pragma: no cover
print(
"The Windows OS is picky about file-locks and did "
"not permit terrainbento to remove the netcdf files."
)
keep_filepaths.append(filepath) # could not delete
else:
self.vprint(f"Keeping {filepath}")
# Not deleting this file
keep_filepaths.append(filepath)
self._output_filepaths = keep_filepaths
[docs] def get_output_filepaths(self, only_extension=None):
"""Get a list of all output files created by this writer that have
been registered.
Parameters
----------
only_extension : string, optional
Specify what type of files to return. Defaults to None, which will
return all file types generated by this writer that have been
registered.
Returns
-------
filepaths : list of strings
List of filepath strings that match extension requirements and were
registered.
"""
output_filepaths = self._output_filepaths
return_filepaths = []
for filepath in output_filepaths:
# Note: ''[1:] will return '' (i.e. does not crash if no extension)
file_ext = os.path.splitext(filepath)[1][1:]
if only_extension is None or file_ext == only_extension:
return_filepaths.append(filepath)
return return_filepaths
[docs] def vprint(self, msg):
""" Print output to the standard output stream if in verbose mode. """
if self._verbose:
print(msg)