Source code for terrainbento.base_class.erosion_model

# coding: utf8
# !/usr/env/python
"""Base class for common functions of all terrainbento erosion models."""

import os
import sys
import time as tm

import numpy as np
import xarray as xr
import yaml

from landlab import ModelGrid, RasterModelGrid, create_grid
from landlab.components import FlowAccumulator, NormalFault
from landlab.io.netcdf import to_netcdf, write_raster_netcdf
from terrainbento.boundary_handlers import (
    CaptureNodeBaselevelHandler,
    GenericFuncBaselevelHandler,
    NotCoreNodeBaselevelHandler,
    PrecipChanger,
    SingleNodeBaselevelHandler,
)
from terrainbento.clock import Clock
from terrainbento.precipitators import RandomPrecipitator, UniformPrecipitator
from terrainbento.runoff_generators import SimpleRunoff

_SUPPORTED_PRECIPITATORS = {
    "UniformPrecipitator": UniformPrecipitator,
    "RandomPrecipitator": RandomPrecipitator,
}
_SUPPORTED_RUNOFF_GENERATORS = {"SimpleRunoff": SimpleRunoff}

_VALID_PRECIPITATORS = (UniformPrecipitator, RandomPrecipitator)
_VALID_RUNOFF_GENERATORS = SimpleRunoff

_DEFAULT_PRECIPITATOR = {"UniformPrecipitator": {}}
_DEFAULT_RUNOFF_GENERATOR = {"SimpleRunoff": {}}


_SUPPORTED_BOUNDARY_HANDLERS = [
    "NormalFault",
    "PrecipChanger",
    "CaptureNodeBaselevelHandler",
    "NotCoreNodeBaselevelHandler",
    "SingleNodeBaselevelHandler",
    "GenericFuncBaselevelHandler",
]

_HANDLER_METHODS = {
    "NormalFault": NormalFault,
    "PrecipChanger": PrecipChanger,
    "CaptureNodeBaselevelHandler": CaptureNodeBaselevelHandler,
    "NotCoreNodeBaselevelHandler": NotCoreNodeBaselevelHandler,
    "SingleNodeBaselevelHandler": SingleNodeBaselevelHandler,
    "GenericFuncBaselevelHandler": GenericFuncBaselevelHandler,
}


def _verify_boundary_handler(handler):
    bad_name = False
    bad_instance = False
    if isinstance(handler, str):
        if handler not in _SUPPORTED_BOUNDARY_HANDLERS:
            bad_name = True
    else:  # if a dictionary {name, handler}
        for key in handler:
            name = handler[key].__class__.__name__
            if name not in _SUPPORTED_BOUNDARY_HANDLERS:
                bad_instance = True

    if bad_name:
        raise ValueError(
            (
                "Only supported boundary condition handlers are "
                "permitted. These include: {valid}".format(
                    valid="\n".join(_SUPPORTED_BOUNDARY_HANDLERS)
                )
            )
        )

    if bad_instance:
        raise ValueError(
            (
                "An invalid instance of "
                + name
                + " was passed as a boundary handler."
                + str(handler)
            )
        )


def _setup_precipitator_or_runoff(grid, params, supported):
    """"""
    if len(params) != 1:
        raise ValueError(
            "Too many values provided to set up precipitator or runoff_generator"
        )
    for name in params:
        constructor = supported[name]
        instance = constructor(grid, **params[name])
    return instance


def _setup_boundary_handlers(grid, name, params):
    """Setup BoundaryHandlers for use by a terrainbento model.

    A boundary condition handler is a class with a **run_one_step** method
    that takes the parameter ``step``. Permitted boundary condition handlers
    include the Landlab Component **NormalFault** as well as the following
    options from terrainbento: **PrecipChanger**,
    **CaptureNodeBaselevelHandler**, **NotCoreNodeBaselevelHandler**,
    **SingleNodeBaselevelHandler**.

    Parameters
    ----------
    handler : str
        Name of a supported boundary condition handler.
    """
    _verify_boundary_handler(name)
    # Instantiate handler
    handler_func = _HANDLER_METHODS[name]
    boundary_handler = handler_func(grid, **params)

    return boundary_handler


[docs]class ErosionModel(object): """Base class providing common functionality for terrainbento models. An **ErosionModel** is the skeleton for the models of terrain evolution in terrainbento. This is a base class that does not implement any processes, but rather simply handles I/O and setup. Derived classes are meant to include Landlab components to model actual erosion processes. It is expected that a derived model will define an **__init__** and a **run_one_step** method. If desired, the derived model can overwrite the existing **run_for**, **run**, and **finalize** methods. The following at-node fields must be specified in the grid: - ``topographic__elevation`` """ _required_fields = ["topographic__elevation"]
[docs] @classmethod def from_file(cls, file_like): """Construct a terrainbento model from a file. Parameters ---------- file_like : file_like or str Contents of a parameter file, a file-like object, or the path to a parameter file. Examples -------- >>> from io import StringIO >>> filelike = StringIO(''' ... grid: ... RasterModelGrid: ... - [4, 5] ... - fields: ... node: ... topographic__elevation: ... constant: ... - value: 0.0 ... clock: ... step: 1 ... stop: 200 ... ''') >>> model = ErosionModel.from_file(filelike) >>> model.clock.step 1.0 >>> model.clock.stop 200.0 >>> model.grid.shape (4, 5) """ # first get contents. try: contents = file_like.read() except AttributeError: # was a str if os.path.isfile(file_like): with open(file_like, "r") as fp: contents = fp.read() else: contents = file_like # not tested # then parse contents. params = yaml.safe_load(contents) # construct instance return cls.from_dict(params)
[docs] @classmethod def from_dict(cls, params, output_writers=None): """Construct a terrainbento model from an input parameter dictionary. The input parameter dictionary portion associated with the "grid" keword will be passed directly to the Landlab `create_grid <https://landlab.readthedocs.io/en/master/reference/grid/create.html#landlab.grid.create.create_grid>`_. function. Parameters ---------- params : dict Dictionary of input parameters. output_writers : dictionary of output writers. Classes or functions used to write incremental output (e.g. make a diagnostic plot). These should be passed in a dictionary with two keys: "class" and "function". The value associated with each of these should be a list containing the uninstantiated output writers. See the Jupyter notebook examples for more details. Examples -------- >>> params = { ... "grid": { ... "RasterModelGrid": [ ... (4, 5), ... { ... "fields": { ... "node": { ... "topographic__elevation": { ... "constant": [{"value": 0.0}] ... } ... } ... } ... }, ... ] ... }, ... "clock": {"step": 1, "stop": 200}, ... } >>> model = ErosionModel.from_dict(params) >>> model.clock.step 1.0 >>> model.clock.stop 200.0 >>> model.grid.shape (4, 5) """ cls._validate(params) # grid, clock grid = create_grid(params.pop("grid")) clock = Clock.from_dict(params.pop("clock")) # precipitator precip_params = params.pop("precipitator", _DEFAULT_PRECIPITATOR) precipitator = _setup_precipitator_or_runoff( grid, precip_params, _SUPPORTED_PRECIPITATORS ) # runoff_generator runoff_params = params.pop( "runoff_generator", _DEFAULT_RUNOFF_GENERATOR ) runoff_generator = _setup_precipitator_or_runoff( grid, runoff_params, _SUPPORTED_RUNOFF_GENERATORS ) # boundary_handlers boundary_handlers = params.pop("boundary_handlers", {}) bh_dict = {} for name in boundary_handlers: bh_params = boundary_handlers[name] bh_dict[name] = _setup_boundary_handlers(grid, name, bh_params) # create instance return cls( clock, grid, precipitator=precipitator, runoff_generator=runoff_generator, boundary_handlers=bh_dict, output_writers=output_writers, **params )
@classmethod def _validate(cls, params): """Make sure necessary things for a model grid and a clock are here.""" if "grid" not in params: raise ValueError("No grid provided as part of input parameters") if "clock" not in params: raise ValueError("No clock provided as part of input parameters")
[docs] def __init__( self, clock, grid, precipitator=None, runoff_generator=None, flow_director="FlowDirectorSteepest", depression_finder=None, flow_accumulator_kwargs=None, boundary_handlers=None, output_writers=None, output_interval=None, save_first_timestep=True, output_prefix="terrainbento_output", fields=None, ): """ Parameters ---------- clock : terrainbento Clock instance grid : landlab model grid instance The grid must have all required fields. precipitator : terrainbento precipitator, optional An instantiated version of a valid precipitator. See the :py:mod:`precipitator <terrainbento.precipitator>` module for valid options. The precipitator creates rain. Default value is the :py:class:`UniformPrecipitator` with a rainfall flux of 1.0. runoff_generator : terrainbento runoff_generator, optional An instantiated version of a valid runoff generator. See the :py:mod:`runoff generator <terrainbento.runoff_generator>` module for valid options. The runoff generator converts rain into runoff. This runoff is then accumulated into surface water discharge (:math:`Q`) and used by channel erosion components. Default value is :py:class:`SimpleRunoff` in which all rainfall turns into runoff. For the drainage area version of the stream power law use the default precipitator and runoff_generator. If the default values of both the precipitator and runoff_generator are used, then :math:`Q` will be equal to drainage area. flow_director : str, optional String name of a `Landlab FlowDirector <https://landlab.readthedocs.io/en/master/reference/components/flow_director.html>`_. Default is "FlowDirectorSteepest". depression_finder : str, optional String name of a Landlab depression finder. Default is None. flow_accumulator_kwargs : dictionary, optional Dictionary of any additional keyword arguments to pass to the `Landlab FlowAccumulator <https://landlab.readthedocs.io/en/master/reference/components/flow_accum.html>`_. Default is an empty dictionary. boundary_handlers : dictionary, optional Dictionary with ``name: instance`` key-value pairs. Each entry must be a valid instance of a terrainbento boundary handler. See the :py:mod:`boundary handlers <terrainbento.boundary_handlers>` module for valid options. output_writers : dictionary of output writers. Classes or functions used to write incremental output (e.g. make a diagnostic plot). These should be passed in a dictionary with two keys: "class" and "function". The value associated with each of these should be a list containing the uninstantiated output writers. See the Jupyter notebook examples for more details. output_interval : float, optional Default is the Clock's stop time. save_first_timestep : bool, optional Indicates whether model output should be saved at time zero. Default is True. output_prefix : str, optional String prefix for names of output netCDF files. Default is ``"terrainbento_output"``. fields : list, optional List of field names to write as netCDF output. Default is to only write out "topographic__elevation". Returns ------- ErosionModel: object Examples -------- This model is a base class and is not designed to be run on its own. We recommend that you look at the terrainbento tutorials for examples of usage. """ flow_accumulator_kwargs = flow_accumulator_kwargs or {} boundary_handlers = boundary_handlers or {} output_writers = output_writers or {} fields = fields or ["topographic__elevation"] # type checking if isinstance(clock, Clock) is False: raise ValueError("Provided Clock is not valid.") if isinstance(grid, ModelGrid) is False: raise ValueError("Provided Grid is not valid.") # save the grid, clock, and parameters. self.grid = grid self.clock = clock # first pass of verifying fields self._verify_fields(self._required_fields) # save reference to elevation self.z = grid.at_node["topographic__elevation"] self.grid.add_zeros("node", "cumulative_elevation_change") self.grid.add_field( "node", "initial_topographic__elevation", self.z.copy() ) # save output_information self.save_first_timestep = save_first_timestep self._out_file_name = output_prefix self.output_fields = fields self._output_files = [] if output_interval is None: output_interval = clock.stop self.output_interval = output_interval # instantiate model time. self._model_time = 0.0 # instantiate container for computational timestep: self._compute_time = [tm.time()] ################################################################### # address Precipitator and RUNOFF_GENERATOR ################################################################### # verify that precipitator is valid if precipitator is None: precipitator = UniformPrecipitator(self.grid) else: if isinstance(precipitator, _VALID_PRECIPITATORS) is False: raise ValueError("Provided value for precipitator not valid.") self.precipitator = precipitator # verify that runoff_generator is valid if runoff_generator is None: runoff_generator = SimpleRunoff(self.grid) else: if isinstance(runoff_generator, _VALID_RUNOFF_GENERATORS) is False: raise ValueError( "Provide value for runoff_generator not valid." ) self.runoff_generator = runoff_generator ################################################################### # instantiate flow direction and accumulation ################################################################### # Instantiate a FlowAccumulator, if DepressionFinder is provided # AND director = Steepest, then we need routing to be D4, # otherwise, just passing params should be sufficient. if (depression_finder is not None) and ( flow_director == "FlowDirectorSteepest" ): self.flow_accumulator = FlowAccumulator( self.grid, routing="D4", depression_finder=depression_finder, **flow_accumulator_kwargs ) else: self.flow_accumulator = FlowAccumulator( self.grid, flow_director=flow_director, depression_finder=depression_finder, **flow_accumulator_kwargs ) if self.flow_accumulator.depression_finder is None: self._erode_flooded_nodes = True else: self._erode_flooded_nodes = False ################################################################### # Boundary Conditions and Output Writers ################################################################### _verify_boundary_handler(boundary_handlers) self.boundary_handlers = boundary_handlers if "class" in output_writers: instantiated_classes = [] for ow_class in output_writers["class"]: instantiated_classes.append(ow_class(self)) output_writers["class"] = instantiated_classes self.output_writers = output_writers
def _verify_fields(self, required_fields): """Verify all required fields are present.""" for field in required_fields: if field not in self.grid.at_node: raise ValueError( "Required field {field} not present.".format(field=field) ) @property def model_time(self): """Return current time of model integration in model time units.""" return self._model_time
[docs] def calculate_cumulative_change(self): """Calculate cumulative node-by-node changes in elevation.""" self.grid.at_node["cumulative_elevation_change"][:] = ( self.grid.at_node["topographic__elevation"] - self.grid.at_node["initial_topographic__elevation"] )
[docs] def create_and_move_water(self, step): """Create and move water. Run the precipitator, the runoff generator, and the flow accumulator, in that order. """ self.precipitator.run_one_step(step) self.runoff_generator.run_one_step(step) self.flow_accumulator.run_one_step()
[docs] def write_output(self): """Write output to file as a netCDF. Filenames will have the value of ``"output_filename"`` from the input file or parameter dictionary as the first part of the file name and the model run iteration as the second part of the filename. """ self.calculate_cumulative_change() filename = self._out_file_name + str(self.iteration).zfill(4) + ".nc" self._output_files.append(filename) if isinstance(self.grid, RasterModelGrid): write_raster_netcdf( filename, self.grid, names=self.output_fields, format="NETCDF4" ) else: to_netcdf(self.grid, filename, format="NETCDF4") self.run_output_writers()
[docs] def finalize__run_one_step(self, step): """Finalize run_one_step method. This base-class method increments model time and updates boundary conditions. """ # calculate model time self._model_time += step # Update boundary conditions self.update_boundary_conditions(step)
[docs] def finalize(self): """Finalize model. This base-class method does nothing. Derived classes can override it to run any required finalization steps. """ pass
[docs] def run_for(self, step, runtime): """Run model without interruption for a specified time period. ``run_for`` runs the model for the duration ``runtime`` with model time steps of ``step``. Parameters ---------- step : float Model run timestep. runtime : float Total duration for which to run model. """ elapsed_time = 0.0 keep_running = True while keep_running: if elapsed_time + step >= runtime: step = runtime - elapsed_time keep_running = False self.run_one_step(step) elapsed_time += step
[docs] def run(self): """Run the model until complete. The model will run for the duration indicated by the input file or dictionary parameter ``"stop"``, at a time step specified by the parameter ``"step"``, and create ouput at intervals of ``"output_duration"``. """ self._itters = [] if self.save_first_timestep: self.iteration = 0 self._itters.append(0) self.write_output() self.iteration = 1 time_now = self._model_time while time_now < self.clock.stop: next_run_pause = min( time_now + self.output_interval, self.clock.stop ) self.run_for(self.clock.step, next_run_pause - time_now) time_now = self._model_time self._itters.append(self.iteration) self.write_output() self.iteration += 1 # now that the model is finished running, execute finalize. self.finalize()
def _ensure_precip_runoff_are_vanilla(self, vsa_precip=False): """Ensure only default versions of precipitator/runoff are used. Some models only work when the precipitator and runoff generator are the default versions. """ if isinstance(self.precipitator, UniformPrecipitator) is False: raise ValueError( "This model must be run with a UniformPrecipitator." ) if vsa_precip is False: if self.precipitator._rainfall_flux != 1: raise ValueError( "This model must use a rainfall__flux value of 1.0." ) # if isinstance(self.runoff_generator, SimpleRunoff) is False: # raise ValueError("This model must be run with SimpleRunoff.") if self.runoff_generator.runoff_proportion != 1.0: raise ValueError("The model must use a runoff_proportion of 1.0.")
[docs] def run_output_writers(self): """Run all output writers.""" if "class" in self.output_writers: for ow_class in self.output_writers["class"]: ow_class.run_one_step() if "function" in self.output_writers: for ow_function in self.output_writers["function"]: ow_function(self)
[docs] def update_boundary_conditions(self, step): """Run all boundary handlers forward by step. Parameters ---------- step : float Timestep in unit of model time. """ # Run each of the baselevel handlers. for name in self.boundary_handlers: self.boundary_handlers[name].run_one_step(step)
[docs] def to_xarray_dataset( self, time_unit="time units", reference_time="model start", space_unit="space units", ): """Convert model output to an xarray dataset. If you would like to have CF compliant NetCDF make sure that your time and space units and reference times will work with standard decoding. The default time unit and reference time will give the time dimention a value of "time units since model start". The default space unit will give a value of "space unit". Parameters ---------- time_unit: str, optional Name of time unit. Default is "time units". reference time: str, optional Reference tim. Default is "model start". space_unit: str, optional Name of space unit. Default is "space unit". """ # open all files as a xarray dataset ds = xr.open_mfdataset( self._output_files, concat_dim="nt", engine="netcdf4", combine="nested", data_vars=self.output_fields, ) # add a time dimension time_array = np.asarray(self._itters) * self.output_interval time = xr.DataArray( time_array, dims=("nt"), attrs={ "units": time_unit + " since " + reference_time, "standard_name": "time", }, ) ds["time"] = time # set x and y to coordinates ds = ds.set_coords(["x", "y", "time"]) # rename dimensions ds = ds.rename(name_dict={"ni": "x", "nj": "y", "nt": "time"}) # set x and y units ds["x"] = xr.DataArray(ds.x, dims=("x"), attrs={"units": space_unit}) ds["y"] = xr.DataArray(ds.y, dims=("y"), attrs={"units": space_unit}) return ds
[docs] def save_to_xarray_dataset( self, filename="terrainbento.nc", time_unit="time units", reference_time="model start", space_unit="space units", ): """Save model output to xarray dataset. If you would like to have CF compliant NetCDF make sure that your time and space units and reference times will work with standard decoding. The default time unit and reference time will give the time dimention a value of "time units since model start". The default space unit will give a value of "space unit". Parameters ---------- filename: str, optional The file path where the file should be saved. The default value is "terrainbento.nc". time_unit: str, optional Name of time unit. Default is "time units". reference time: str, optional Reference tim. Default is "model start". space_unit: str, optional Name of space unit. Default is "space unit". """ ds = self.to_xarray_dataset( time_unit=time_unit, space_unit=space_unit, reference_time=reference_time, ) ds.to_netcdf(filename, engine="netcdf4", format="NETCDF4") ds.close()
[docs] def remove_output_netcdfs(self): """Remove all netCDF files written by a model run.""" try: for f in self._output_files: os.remove(f) except WindowsError: # pragma: no cover print( "The Windows OS is picky about file-locks and did not permit " "terrainbento to remove the netcdf files." ) # pragma: no cover
[docs]def main(): # pragma: no cover """Executes model.""" try: infile = sys.argv[1] except IndexError: print("Must include input file name on command line") sys.exit(1) erosion_model = ErosionModel.from_file(infile) erosion_model.run()
if __name__ == "__main__": main()