# coding: utf8
# !/usr/env/python
"""Base class for common functions of all terrainbento erosion models."""
import os
import sys
import time as tm
import warnings
import numpy as np
import xarray as xr
import yaml
from landlab import ModelGrid, create_grid
from landlab.components import FlowAccumulator, NormalFault
from terrainbento.boundary_handlers import (
CaptureNodeBaselevelHandler,
GenericFuncBaselevelHandler,
NotCoreNodeBaselevelHandler,
PrecipChanger,
SingleNodeBaselevelHandler,
)
from terrainbento.clock import Clock
from terrainbento.output_writers import (
GenericOutputWriter,
OWSimpleNetCDF,
StaticIntervalOutputClassAdapter,
StaticIntervalOutputFunctionAdapter,
StaticIntervalOutputWriter,
)
from terrainbento.precipitators import RandomPrecipitator, UniformPrecipitator
from terrainbento.runoff_generators import SimpleRunoff
_SUPPORTED_PRECIPITATORS = {
"UniformPrecipitator": UniformPrecipitator,
"RandomPrecipitator": RandomPrecipitator,
}
_SUPPORTED_RUNOFF_GENERATORS = {"SimpleRunoff": SimpleRunoff}
_VALID_PRECIPITATORS = (UniformPrecipitator, RandomPrecipitator)
_VALID_RUNOFF_GENERATORS = SimpleRunoff
_DEFAULT_PRECIPITATOR = {"UniformPrecipitator": {}}
_DEFAULT_RUNOFF_GENERATOR = {"SimpleRunoff": {}}
_SUPPORTED_BOUNDARY_HANDLERS = [
"NormalFault",
"PrecipChanger",
"CaptureNodeBaselevelHandler",
"NotCoreNodeBaselevelHandler",
"SingleNodeBaselevelHandler",
"GenericFuncBaselevelHandler",
]
_HANDLER_METHODS = {
"NormalFault": NormalFault,
"PrecipChanger": PrecipChanger,
"CaptureNodeBaselevelHandler": CaptureNodeBaselevelHandler,
"NotCoreNodeBaselevelHandler": NotCoreNodeBaselevelHandler,
"SingleNodeBaselevelHandler": SingleNodeBaselevelHandler,
"GenericFuncBaselevelHandler": GenericFuncBaselevelHandler,
}
_DEFAULT_OUTPUT_DIR = os.path.join(os.curdir, "output")
def _verify_boundary_handler(handler):
bad_name = False
bad_instance = False
if isinstance(handler, str):
if handler not in _SUPPORTED_BOUNDARY_HANDLERS:
bad_name = True
else: # if a dictionary {name, handler}
for key in handler:
name = handler[key].__class__.__name__
if name not in _SUPPORTED_BOUNDARY_HANDLERS:
bad_instance = True
if bad_name:
raise ValueError(
(
"Only supported boundary condition handlers are "
"permitted. These include: {valid}".format(
valid="\n".join(_SUPPORTED_BOUNDARY_HANDLERS)
)
)
)
if bad_instance:
raise ValueError(
(
"An invalid instance of "
+ name
+ " was passed as a boundary handler."
+ str(handler)
)
)
def _setup_precipitator_or_runoff(grid, params, supported):
""""""
if len(params) != 1:
raise ValueError(
"Too many values provided to set up precipitator or runoff_generator"
)
for name in params:
constructor = supported[name]
instance = constructor(grid, **params[name])
return instance
def _setup_boundary_handlers(grid, name, params):
"""Setup BoundaryHandlers for use by a terrainbento model.
A boundary condition handler is a class with a **run_one_step** method
that takes the parameter ``step``. Permitted boundary condition handlers
include the Landlab Component **NormalFault** as well as the following
options from terrainbento: **PrecipChanger**,
**CaptureNodeBaselevelHandler**, **NotCoreNodeBaselevelHandler**,
**SingleNodeBaselevelHandler**.
Parameters
----------
handler : str
Name of a supported boundary condition handler.
"""
_verify_boundary_handler(name)
# Instantiate handler
handler_func = _HANDLER_METHODS[name]
boundary_handler = handler_func(grid, **params)
return boundary_handler
[docs]class ErosionModel(object):
"""Base class providing common functionality for terrainbento models.
An **ErosionModel** is the skeleton for the models of terrain evolution in
terrainbento.
This is a base class that does not implement any processes, but rather
simply handles I/O and setup. Derived classes are meant to include
Landlab components to model actual erosion processes.
It is expected that a derived model will define an **__init__** and a
**run_one_step** method. If desired, the derived model can overwrite the
existing **run_for**, **run**, and **finalize** methods.
The following at-node fields must be specified in the grid:
- ``topographic__elevation``
"""
_required_fields = ["topographic__elevation"]
# Setup
[docs] @classmethod
def from_file(cls, file_like):
"""Construct a terrainbento model from a file.
Parameters
----------
file_like : file_like or str
Contents of a parameter file, a file-like object, or the path to
a parameter file.
Examples
--------
>>> from io import StringIO
>>> filelike = StringIO('''
... grid:
... RasterModelGrid:
... - [4, 5]
... - fields:
... node:
... topographic__elevation:
... constant:
... - value: 0.0
... clock:
... step: 1
... stop: 200
... ''')
>>> model = ErosionModel.from_file(filelike)
>>> model.clock.step
1.0
>>> model.clock.stop
200.0
>>> model.grid.shape
(4, 5)
"""
# first get contents.
try:
contents = file_like.read()
except AttributeError: # was a str
if os.path.isfile(file_like):
with open(file_like, "r") as fp:
contents = fp.read()
else:
contents = file_like # not tested
# then parse contents.
params = yaml.safe_load(contents)
# construct instance
return cls.from_dict(params)
[docs] @classmethod
def from_dict(cls, params, output_writers=None):
"""Construct a terrainbento model from an input parameter dictionary.
The input parameter dictionary portion associated with the "grid"
keword will be passed directly to the Landlab
`create_grid <https://landlab.readthedocs.io/en/master/reference/grid/create.html#landlab.grid.create.create_grid>`_.
function.
Parameters
----------
params : dict
Dictionary of input parameters.
output_writers : dictionary of output writers.
Classes or functions used to write incremental output (e.g. make a
diagnostic plot). There are two formats for the dictionary entries:
1) Items can have a key of "class" or "function" and a value of
a list of simple output classes (uninstantiated) or
functions, respectively. All output writers defined this way
will use the `output_interval` provided to the ErosionModel
constructor.
2) Items can have a key with any unique string representing the
output writer's name and a value containing a dict with the
uninstantiated class and arguments. The value follows the
format:
.. code-block:: python
{
'class' : MyWriter,
'args' : [], # optional
'kwargs' : {}, # optional
}
where `args` and `kwargs` are passed to the constructor for
`MyWriter`. `MyWriter` must be a child class of
GenericOutputWriter.
The two formats can be present simultaneously. See the Jupyter
notebook examples for more details.
Examples
--------
>>> params = {
... "grid": {
... "RasterModelGrid": [
... (4, 5),
... {
... "fields": {
... "node": {
... "topographic__elevation": {
... "constant": [{"value": 0.0}]
... }
... }
... }
... },
... ]
... },
... "clock": {"step": 1, "stop": 200},
... }
>>> model = ErosionModel.from_dict(params)
>>> model.clock.step
1.0
>>> model.clock.stop
200.0
>>> model.grid.shape
(4, 5)
"""
cls._validate(params)
# grid, clock
grid = create_grid(params.pop("grid"))
clock = Clock.from_dict(params.pop("clock"))
# precipitator
precip_params = params.pop("precipitator", _DEFAULT_PRECIPITATOR)
precipitator = _setup_precipitator_or_runoff(
grid, precip_params, _SUPPORTED_PRECIPITATORS
)
# runoff_generator
runoff_params = params.pop(
"runoff_generator", _DEFAULT_RUNOFF_GENERATOR
)
runoff_generator = _setup_precipitator_or_runoff(
grid, runoff_params, _SUPPORTED_RUNOFF_GENERATORS
)
# boundary_handlers
boundary_handlers = params.pop("boundary_handlers", {})
bh_dict = {}
for name in boundary_handlers:
bh_params = boundary_handlers[name]
bh_dict[name] = _setup_boundary_handlers(grid, name, bh_params)
# create instance
return cls(
clock,
grid,
precipitator=precipitator,
runoff_generator=runoff_generator,
boundary_handlers=bh_dict,
output_writers=output_writers,
**params,
)
@classmethod
def _validate(cls, params):
"""Make sure necessary things for a model grid and a clock are here."""
if "grid" not in params:
raise ValueError("No grid provided as part of input parameters")
if "clock" not in params:
raise ValueError("No clock provided as part of input parameters")
[docs] def __init__(
self,
clock,
grid,
precipitator=None,
runoff_generator=None,
flow_director="FlowDirectorSteepest",
depression_finder=None,
flow_accumulator_kwargs=None,
boundary_handlers=None,
output_writers=None,
output_default_netcdf=True,
output_interval=None,
save_first_timestep=True,
save_last_timestep=True,
output_prefix="terrainbento-output",
output_dir=_DEFAULT_OUTPUT_DIR,
fields=None,
):
"""
Parameters
----------
clock : terrainbento Clock instance
grid : landlab model grid instance
The grid must have all required fields.
precipitator : terrainbento precipitator, optional
An instantiated version of a valid precipitator. See the
:py:mod:`precipitator <terrainbento.precipitator>` module for
valid options. The precipitator creates rain. Default value is the
:py:class:`UniformPrecipitator` with a rainfall flux of 1.0.
runoff_generator : terrainbento runoff_generator, optional
An instantiated version of a valid runoff generator. See the
:py:mod:`runoff generator <terrainbento.runoff_generator>` module
for valid options. The runoff generator converts rain into runoff.
This runoff is then accumulated into surface water discharge
(:math:`Q`) and used by channel erosion components. Default value
is :py:class:`SimpleRunoff` in which all rainfall turns into
runoff. For the drainage area version of the stream power law use
the default precipitator and runoff_generator.
If the default values of both the precipitator and
runoff_generator are used, then :math:`Q` will be equal to drainage
area.
flow_director : str, optional
String name of a
`Landlab FlowDirector <https://landlab.readthedocs.io/en/master/reference/components/flow_director.html>`_.
Default is "FlowDirectorSteepest".
depression_finder : str, optional
String name of a Landlab depression finder. Default is None.
flow_accumulator_kwargs : dictionary, optional
Dictionary of any additional keyword arguments to pass to the
`Landlab FlowAccumulator <https://landlab.readthedocs.io/en/master/reference/components/flow_accum.html>`_.
Default is an empty dictionary.
boundary_handlers : dictionary, optional
Dictionary with ``name: instance`` key-value pairs. Each entry
must be a valid instance of a terrainbento boundary handler. See
the :py:mod:`boundary handlers <terrainbento.boundary_handlers>`
module for valid options.
output_writers : dictionary of output writers.
Classes or functions used to write incremental output (e.g. make a
diagnostic plot). There are two formats for the dictionary entries:
1. ("Old style") Items can have a key of "class" or "function"
and a value that is a list of uninstantiated output classes
or a list of output functions, respectively. All output
writers defined this way will use the **output_interval**
argument provided to the ErosionModel constructor.
2. ("New style") Items can have a key of any unique string
representing the output writer's name and a value that is a
dictionary containing the uninstantiated class and any
arguments. The dictionary follows the format:
.. code-block:: python
{
'class' : MyWriter,
'args' : [], # optional
'kwargs' : {}, # optional
}
where `args` and `kwargs` are passed to the constructor for
`MyWriter`. All new style output writers must be a child
class of GenericOutputWriter. The ErosionModel reference is
automatically prepended to args.
The two formats can be present simultaneously. See the Jupyter
notebook examples for more details.
output_default_netcdf : bool, optional
Indicates whether the erosion model should automatically create a
simple netcdf output writer which behaves identical to the built-in
netcdf writer from older terrainbento versions. Uses the
'output_interval' argument as the output interval. Defaults to True.
output_interval : float, optional
The time between output calls for old-style output writers and the
default netcdf writer. Default is the Clock's stop time.
save_first_timestep : bool, optional
Indicates whether model output should be saved at time zero (the
initial conditions). This affects old and new style output writers.
Default is True.
save_last_timestep : bool, optional
Indicates that the last output time must be at the clock stop time.
This affects old and new style output writers. Defaults to True.
output_prefix : str, optional
String prefix for names of all output files. Default is
``"terrainbento-output"``.
output_dir : string, optional
Directory that output should be saved to. Defaults to an "output"
directory in the current directory.
fields : list, optional
List of field names to write as netCDF output. Default is to only
write out "topographic__elevation".
Returns
-------
ErosionModel: object
Examples
--------
This model is a base class and is not designed to be run on its own. We
recommend that you look at the terrainbento tutorials for examples of
usage.
"""
flow_accumulator_kwargs = flow_accumulator_kwargs or {}
boundary_handlers = boundary_handlers or {}
output_writers = output_writers or {}
fields = fields or ["topographic__elevation"]
# type checking
if isinstance(clock, Clock) is False:
raise ValueError("Provided Clock is not valid.")
if isinstance(grid, ModelGrid) is False:
raise ValueError("Provided Grid is not valid.")
# save the grid, clock, and parameters.
self.grid = grid
self.clock = clock
# first pass of verifying fields
self._verify_fields(self._required_fields)
# save reference to elevation
self.z = grid.at_node["topographic__elevation"]
self.grid.add_zeros("node", "cumulative_elevation_change")
self.grid.add_field(
"node", "initial_topographic__elevation", self.z.copy()
)
# save output_information
self.save_first_timestep = save_first_timestep
self.save_last_timestep = save_last_timestep
self._output_prefix = output_prefix
self.output_dir = output_dir
self.output_fields = fields
self._output_files = []
if output_interval is None:
output_interval = clock.stop
self.output_interval = output_interval
# instantiate model time.
self._model_time = 0.0
# instantiate container for computational timestep:
self._compute_time = [tm.time()]
###################################################################
# address Precipitator and RUNOFF_GENERATOR
###################################################################
# verify that precipitator is valid
if precipitator is None:
precipitator = UniformPrecipitator(self.grid)
else:
if isinstance(precipitator, _VALID_PRECIPITATORS) is False:
raise ValueError("Provided value for precipitator not valid.")
self.precipitator = precipitator
# verify that runoff_generator is valid
if runoff_generator is None:
runoff_generator = SimpleRunoff(self.grid)
else:
if isinstance(runoff_generator, _VALID_RUNOFF_GENERATORS) is False:
raise ValueError(
"Provide value for runoff_generator not valid."
)
self.runoff_generator = runoff_generator
###################################################################
# instantiate flow direction and accumulation
###################################################################
# Instantiate a FlowAccumulator, if DepressionFinder is provided
# AND director = Steepest, then we need routing to be D4,
# otherwise, just passing params should be sufficient.
if (depression_finder is not None) and (
flow_director == "FlowDirectorSteepest"
):
self.flow_accumulator = FlowAccumulator(
self.grid,
routing="D4",
depression_finder=depression_finder,
**flow_accumulator_kwargs,
)
else:
self.flow_accumulator = FlowAccumulator(
self.grid,
flow_director=flow_director,
depression_finder=depression_finder,
**flow_accumulator_kwargs,
)
if self.flow_accumulator.depression_finder is None:
self._erode_flooded_nodes = True
else:
self._erode_flooded_nodes = False
###################################################################
# Boundary Conditions and Output Writers
###################################################################
_verify_boundary_handler(boundary_handlers)
self.boundary_handlers = boundary_handlers
# Instantiate all the output writers and store in a list
self.all_output_writers = self._setup_output_writers(
output_writers,
output_default_netcdf,
)
# Keep track of when each writer needs to write next
self.active_output_times = {} # {next time : [writers]}
self.sorted_output_times = [] # sorted list of the next output times
for ow_writer in self.all_output_writers:
first_time = ow_writer.advance_iter()
self._update_output_times(ow_writer, first_time, None)
def _verify_fields(self, required_fields):
"""Verify all required fields are present."""
for field in required_fields:
if field not in self.grid.at_node:
raise ValueError(
"Required field {field} not present.".format(field=field)
)
def _setup_output_writers(self, output_writers, output_default_netcdf):
"""Convert all output writers to the new style and instantiate output
writer classes.
Parameters
----------
output_writers : dictionary of output writers.
Classes or functions used to write incremental output (e.g. make a
diagnostic plot). There are two formats for the dictionary entries:
1) ("Old style") Items can have a key of "class" or "function"
and a value that is a list of uninstantiated output classes
or a list of output functions, respectively. All output
writers defined this way will use the **output_interval**
argument provided to the ErosionModel constructor.
2) ("New style") Items can have a key of any unique string
representing the output writer's name and a value that is a
dictionary containing the uninstantiated class and any
arguments. The dictionary follows the format: {
'class' : MyWriter,
'args' : [], # optional
'kwargs' : {}, # optional
}
where `args` and `kwargs` are passed to the constructor for
`MyWriter`. All new style output writers must be a child
class of GenericOutputWriter. The ErosionModel reference is
automatically prepended to args.
The two formats can be present simultaneously. See the Jupyter
notebook examples for more details.
output_default_netcdf : bool, optional
Indicates whether the erosion model should automatically create a
simple netcdf output writer which behaves identical to the built-in
netcdf writer from older terrainbento versions. Uses the
'output_interval' argument as the output interval. Defaults to True.
Returns
-------
instantiated_output_writers : list of GenericOutputWriter objects
A list of instantiated output writers all based on the
GenericOutputWriter.
Notes
-----
All classes and functions provided in the 'class' and 'function'
entries in the output_writer dictionary will be given to an adapter
class for StaticIntervalOutputWriter so that they can be used with the
new framework. All of theses writers will use `output_interval`.
"""
# Note: I can't guarantee that the names will stay unique. I need to
# convert the 'class' and 'function' writers to the new style and there
# is a non-zero chance the user happens to use a name for the new style
# that has the same name that I give the converted writers. These names
# are used for output filenames, so I don't want to use anything ugly.
# Hence why I return a list instead of another dictionary.
# Add a default netcdf writer if desired.
assert isinstance(output_default_netcdf, bool)
if output_default_netcdf:
output_writers["simple-netcdf"] = {
"class": OWSimpleNetCDF,
"args": [self.output_fields],
"kwargs": {
"intervals": self.output_interval,
"add_id": True,
"output_dir": self.output_dir,
},
}
instantiated_writers = []
for name in output_writers:
if name == "class":
# Old style class output writers. Give information to an
# adapter for instantiating as a static interval writer.
for ow_class in output_writers["class"]:
new_writer = StaticIntervalOutputClassAdapter(
model=self,
output_interval=self.output_interval,
ow_class=ow_class,
save_first_timestep=self.save_first_timestep,
save_last_timestep=self.save_last_timestep,
output_dir=self.output_dir,
)
# new_name = new_writer.name
# assert new_name not in instantiated_writers, \
# f"Output writer '{name}' already exists"
instantiated_writers.append(new_writer)
elif name == "function":
# Old style function output writers. Give information to an
# adapter for instantiating as a static interval writer.
for ow_function in output_writers["function"]:
new_writer = StaticIntervalOutputFunctionAdapter(
model=self,
output_interval=self.output_interval,
ow_function=ow_function,
save_first_timestep=self.save_first_timestep,
save_last_timestep=self.save_last_timestep,
output_dir=self.output_dir,
)
instantiated_writers.append(new_writer)
else:
# New style output writer class
writer_dict = output_writers[name]
assert isinstance(
writer_dict, dict
), "The new style output writer entry must be a dictionary"
assert "class" in writer_dict, "".join(
[
f"New style output writer {name} must have a 'class'",
"entry",
]
)
ow_class = writer_dict["class"]
ow_args = writer_dict.get("args", [self])
ow_kwargs = writer_dict.get("kwargs", {})
# Prepend a reference to the model to the args (if not there)
if ow_args and ow_args[0] is not self:
ow_args = [self] + ow_args
elif ow_args is None: # pragma: no cover
ow_args = [self]
# Add some kwargs if they were not already provided
defaults = {
"name": name,
"save_first_timestep": self.save_first_timestep,
"save_last_timestep": self.save_last_timestep,
"output_dir": self.output_dir,
}
if issubclass(ow_class, StaticIntervalOutputWriter):
if "times" not in ow_kwargs:
# Using a static interval writer and no times provided,
# use the output_interval as a default interval.
defaults["intervals"] = self.output_interval
defaults.update(ow_kwargs)
ow_kwargs = defaults
new_writer = ow_class(*ow_args, **ow_kwargs)
instantiated_writers.append(new_writer)
return instantiated_writers
# Attributes
@property
def model_time(self):
"""Return current time of model integration in model time units."""
return self._model_time
@property
def next_output_time(self):
"""Return the next output time in model time units. If there are no
more active output writers, return np.inf instead."""
if self.sorted_output_times:
return self.sorted_output_times[0]
else:
return np.inf
@property
def output_prefix(self):
""" Model prefix for output filenames. """
return self._output_prefix
@property
def _out_file_name(self):
"""(Deprecated) Get the filename model prefix. Used to get the netcdf
filename base."""
warnings.warn(
" ".join(
[
"ErosionModel's _out_file_name is no longer available.",
"Getting _output_prefix instead, but may not behave as expected.",
"Please use the 'output_prefix' argument in the constructor.",
]
),
DeprecationWarning,
)
return self._output_prefix
@_out_file_name.setter
def _out_file_name(self, prefix):
"""(Deprecated) Set the filename model prefix. Used to set the netcdf
filename base."""
warnings.warn(
" ".join(
[
"ErosionModel's _out_file_name is no longer available.",
"Setting _output_prefix instead, but may not behave as expected.",
"Please use the 'output_prefix' argument in the constructor.",
]
),
DeprecationWarning,
)
self._output_prefix = prefix
# Model run methods
[docs] def calculate_cumulative_change(self):
"""Calculate cumulative node-by-node changes in elevation."""
self.grid.at_node["cumulative_elevation_change"][:] = (
self.grid.at_node["topographic__elevation"]
- self.grid.at_node["initial_topographic__elevation"]
)
[docs] def create_and_move_water(self, step):
"""Create and move water.
Run the precipitator, the runoff generator, and the flow
accumulator, in that order.
"""
self.precipitator.run_one_step(step)
self.runoff_generator.run_one_step(step)
self.flow_accumulator.run_one_step()
[docs] def finalize__run_one_step(self, step):
"""Finalize run_one_step method.
This base-class method increments model time and updates
boundary conditions.
"""
# calculate model time
self._model_time += step
# Update boundary conditions
self.update_boundary_conditions(step)
[docs] def finalize(self):
"""Finalize model.
This base-class method does nothing. Derived classes can
override it to run any required finalization steps.
"""
pass
[docs] def run_for(self, step, runtime):
"""Run model without interruption for a specified time period.
``run_for`` runs the model for the duration ``runtime`` with model time
steps of ``step``.
Parameters
----------
step : float
Model run timestep.
runtime : float
Total duration for which to run model.
"""
elapsed_time = 0.0
keep_running = True
while keep_running:
if elapsed_time + step >= runtime:
step = runtime - elapsed_time
keep_running = False
self.run_one_step(step)
elapsed_time += step
[docs] def run(self):
"""Run the model until complete.
The model will run for the duration indicated by the input file
or dictionary parameter ``"stop"``, at a time step specified by
the parameter ``"step"``, and create ouput at intervals specified by
the individual output writers.
"""
self._itters = []
if self.save_first_timestep:
self.iteration = 0
self._itters.append(0)
self.calculate_cumulative_change()
self.write_output()
self.iteration = 1
time_now = self._model_time
while time_now < self.clock.stop:
next_run_pause = min(
# time_now + self.output_interval, self.clock.stop,
self.next_output_time,
self.clock.stop,
)
assert next_run_pause > time_now
self.run_for(self.clock.step, next_run_pause - time_now)
time_now = self._model_time
self._itters.append(self.iteration)
self.calculate_cumulative_change()
self.write_output()
self.iteration += 1
# now that the model is finished running, execute finalize.
self.finalize()
def _ensure_precip_runoff_are_vanilla(self, vsa_precip=False):
"""Ensure only default versions of precipitator/runoff are used.
Some models only work when the precipitator and runoff generator
are the default versions.
"""
if isinstance(self.precipitator, UniformPrecipitator) is False:
raise ValueError(
"This model must be run with a UniformPrecipitator."
)
if vsa_precip is False:
if self.precipitator._rainfall_flux != 1:
raise ValueError(
"This model must use a rainfall__flux value of 1.0."
)
# if isinstance(self.runoff_generator, SimpleRunoff) is False:
# raise ValueError("This model must be run with SimpleRunoff.")
if self.runoff_generator.runoff_proportion != 1.0:
raise ValueError("The model must use a runoff_proportion of 1.0.")
[docs] def update_boundary_conditions(self, step):
"""Run all boundary handlers forward by step.
Parameters
----------
step : float
Timestep in unit of model time.
"""
# Run each of the baselevel handlers.
for name in self.boundary_handlers:
self.boundary_handlers[name].run_one_step(step)
# Output methods
[docs] def write_output(self):
"""Run output writers if it is the correct model time. """
# assert that the model has not passed the next output time.
assert self._model_time <= self.next_output_time, "".join(
[
f"Model time (t={self._model_time}) has passed the next ",
f"output time (t={self.next_output_time})",
]
)
if self._model_time == self.next_output_time:
# The current model time matches the next output time
current_time = self.sorted_output_times.pop(0)
current_writers = self.active_output_times.pop(current_time)
for ow_writer in current_writers:
# Run all the output writers associated with this time.
ow_writer.run_one_step()
next_time = ow_writer.advance_iter()
self._update_output_times(ow_writer, next_time, current_time)
def _update_output_times(self, ow_writer, new_time, current_time):
"""Private method to update the dictionary of active output writers
and the sorted list of next output times.
Parameters
----------
ow_writer : GenericOutputWriter object
The output writer that has just finished writing output and advanced
it's time iterator.
new_time : float
The next time that the output writer will need to write output.
current_time : float
The current model time.
Notes
-----
This function enforces that output times align with model steps. If an
output writer returns a next_time that is in between model steps, then
the output time is delayed to the following step and a warning is
generated. This function may generate skip warnings if the subsequent
next times are less than the delayed step time.
"""
if new_time is None:
# The output writer has exhausted all of it's output times.
# Do not add it back to the active dict/list
return
model_step = self.clock.step
if current_time is not None:
try:
assert new_time > current_time
except AssertionError:
warnings.warn(
"".join(
[
f"The output writer {ow_writer.name} is providing a ",
"next time that is less than or equal to the current ",
"time. Possibly because the previous time was in ",
"between steps, delaying the output until now. ",
"Skipping ahead.",
]
)
)
for n_skips in range(10):
# Allow 10 attempts to skip
new_time = ow_writer.advance_iter()
if new_time is None:
# iterator exhausted. No more processing needed
return
elif new_time > current_time:
break
else:
# Could not find a suitable next_time
raise AssertionError(
"".join(
[
"Output writer failed to return a next time greater ",
"than the current time after several attempts.",
]
)
)
# See if the new output time aligns with the model step.
if (new_time % model_step) != 0.0:
warnings.warn(
"".join(
[
f"Output writer {ow_writer.name} is requesting a ",
"time that is not divisible by the model step. ",
"Delaying output to the following step.\n",
f"Output time = {new_time}\n",
f"Model step = {model_step}\n",
f"Remainder = {new_time % model_step}\n\n",
]
)
)
new_time = np.ceil(new_time / model_step) * model_step
# Add the writer to the active_output_times dict
if new_time in self.active_output_times:
# New time is already in the active_output_times dictionary
self.active_output_times[new_time].append(ow_writer)
else:
# New time is not in the active_output_times dictionary
# Add it to the dict and resort the output times list
self.active_output_times[new_time] = [ow_writer]
self.sorted_output_times = sorted(self.active_output_times)
[docs] def to_xarray_dataset(
self,
time_unit="time units",
reference_time="model start",
space_unit="space units",
):
"""Convert model output to an xarray dataset.
If you would like to have CF compliant NetCDF make sure that your time
and space units and reference times will work with standard decoding.
The default time unit and reference time will give the time dimention a
value of "time units since model start". The default space unit will
give a value of "space unit".
Parameters
----------
time_unit: str, optional
Name of time unit. Default is "time units".
reference time: str, optional
Reference tim. Default is "model start".
space_unit: str, optional
Name of space unit. Default is "space unit".
"""
# open all files as a xarray dataset
ds = xr.open_mfdataset(
self.get_output(extension="nc"),
concat_dim="nt",
engine="netcdf4",
combine="nested",
data_vars=self.output_fields,
)
# add a time dimension
time_array = np.asarray(self._itters) * self.output_interval
time = xr.DataArray(
time_array,
dims=("nt"),
attrs={
"units": time_unit + " since " + reference_time,
"standard_name": "time",
},
)
ds["time"] = time
# set x and y to coordinates
ds = ds.set_coords(["x", "y", "time"])
# rename dimensions
ds = ds.rename(name_dict={"ni": "x", "nj": "y", "nt": "time"})
# set x and y units
ds["x"] = xr.DataArray(ds.x, dims=("x"), attrs={"units": space_unit})
ds["y"] = xr.DataArray(ds.y, dims=("y"), attrs={"units": space_unit})
return ds
[docs] def save_to_xarray_dataset(
self,
filename="terrainbento.nc",
time_unit="time units",
reference_time="model start",
space_unit="space units",
):
"""Save model output to xarray dataset.
If you would like to have CF compliant NetCDF make sure that your time
and space units and reference times will work with standard decoding.
The default time unit and reference time will give the time dimention a
value of "time units since model start". The default space unit will
give a value of "space unit".
Parameters
----------
filename: str, optional
The file path where the file should be saved. The default value is
"terrainbento.nc".
time_unit: str, optional
Name of time unit. Default is "time units".
reference time: str, optional
Reference tim. Default is "model start".
space_unit: str, optional
Name of space unit. Default is "space unit".
"""
ds = self.to_xarray_dataset(
time_unit=time_unit,
space_unit=space_unit,
reference_time=reference_time,
)
ds.to_netcdf(filename, engine="netcdf4", format="NETCDF4")
ds.close()
def _format_extension_and_writer_args(self, extension, writer):
"""Private method to parse the extension and writer arguments for the
**remove_output** and **get_output** functions.
Parameters
----------
extension : string or list of strings or None
Specify the type(s) of files to look for.
writer : GenericOutputWriter instance or list of instances or string or list of strings or None
Specify which output writers to look at either by the writer's
handle or by the writer's name.
Returns
-------
extension_list : list of strings
List of strings representing the types of files to look for. Can
return [None] indicating all files.
writer_list : list of GenericOutputWriters
List of GenericOutputWriter instances to look at. Can
return [None] indicating all output writers should be looked at.
"""
extension_list = None
writer_list = None
if isinstance(writer, GenericOutputWriter):
# Writer argument is an object, convert to a list
writer_list = [writer]
elif isinstance(writer, str):
# Writer argument is the name of the writer, get object
writer_list = self.get_output_writer(writer)
elif isinstance(writer, list):
# Writer argument is a list
writer_list = []
# Check what is in the list
for i, w in enumerate(writer):
if isinstance(w, GenericOutputWriter):
writer_list.append(w)
elif isinstance(w, str):
# Item is a name, replace with the object
found_writers = self.get_output_writer(w)
writer_list += found_writers
else: # pragma: no cover
raise TypeError(f"Unrecognized writer argument. {w}")
elif writer is None:
# Default to all writers
writer_list = self.all_output_writers
else: # pragma: no cover
raise TypeError(f"Unrecognized writer argument. {writer}")
if isinstance(extension, str):
# Extension argument is a string
extension_list = [extension]
elif isinstance(extension, list):
# Extension argument is a list of strings
extension_list = extension
assert all([isinstance(e, str) for e in extension_list])
elif extension is None:
# Default to all extensions
extension_list = [None]
else: # pragma: no cover
raise TypeError(f"Unrecognized extension argument. {extension}")
return extension_list, writer_list
[docs] def remove_output_netcdfs(self):
"""Remove netcdf output files written during a model run. Only works
for new style writers including the default netcdf writer."""
self.remove_output(extension="nc")
[docs] def remove_output(self, extension=None, writer=None):
"""Remove files written by new style writers during a model
run. Does not work for old style writers which have no way to report
what they have written. Can specify types of files and/or writers.
To do: allow 'writer' to be a string for the name of the writer?
Parameters
----------
extension : string or list of strings, optional
Specify what type(s) of files should be deleted. Defaults to None
which deletes all file types. Don't include a leading period.
writer : GenericOutputWriter instance or list of instances or string or list of strings or None
Specify if the files should come from certain output writers either
by the writer's handle or by the writer's name. Defaults to
deleting files from all writers.
"""
lists = self._format_extension_and_writer_args(extension, writer)
extension_list, writer_list = lists
for ow in writer_list:
assert ow is not None
for ext in extension_list:
assert ext is None or isinstance(ext, str)
if ext and ext[0] == ".":
ext = ext[1:] # ignore leading period if present
ow.delete_output_files(ext)
[docs] def get_output(self, extension=None, writer=None):
"""Get a list of filepaths for files written by new style writers
during a model run. Does not work for old style writers which have no
way to report what they have written. Can specify types of files
and/or writers.
Parameters
----------
extension : string or list of strings, optional
Specify what type(s) of files should be returned. Defaults to None
which returns all file types. Don't include a leading period.
writer : GenericOutputWriter instance or list of instances or string or list of strings or None
Specify if the files should come from certain output writers either
by the writer's handle or by the writer's name. Defaults to
returning files from all writers.
Returns
-------
filepaths : list of strings
A list of filepath strings that match the desired extensions and
writers.
"""
lists = self._format_extension_and_writer_args(extension, writer)
extension_list, writer_list = lists
output_list = []
for ow in writer_list:
assert ow is not None
for ext in extension_list:
assert ext is None or isinstance(ext, str)
output_list += ow.get_output_filepaths(ext)
return output_list
[docs] def get_output_writer(self, name):
"""Get the references for object writer(s) from the writer's name.
Parameters
----------
name : string
The name of the output writer to look for. Can match multiple
writers.
Returns
-------
matches : list of GenericOutputWriter objects
The list of any GenericOutputWriter whose name contains the
argument name string. Will return an empty list if there are no
matches.
"""
matches = []
for ow in self.all_output_writers:
if name in ow.name:
matches.append(ow)
return matches
[docs]def main(): # pragma: no cover
"""Executes model."""
try:
infile = sys.argv[1]
except IndexError:
print("Must include input file name on command line")
sys.exit(1)
erosion_model = ErosionModel.from_file(infile)
erosion_model.run()
if __name__ == "__main__": # pragma: no cover
main()