Source code for pyrodeo.simulation

# -*- coding: utf-8 -*-
"""Defines the Simulation class holding a pyrodeo simulation
"""

from __future__ import print_function

import numpy as np
import h5py
from pyrodeo.state import State
from pyrodeo.coords import Coordinates
from pyrodeo.param import Param
from pyrodeo.hydro import Hydro

[docs]class Simulation(object): """Construct pyrodeo simulation. Construct simulation from existing instances of :class:`.Param`, :class:`.Coordinates` and :class:`.State` and given simulation time. Args: param (:class:`.Param`): Valid Param object, containing simulation parameters. coords (:class:`.Coordinates`): Valid Coordinates object, containing x, y and z coordinates state (:class:`.State`): Valid State object, containing current density, velocity and sound speed. t (float): Simulation time. direc (:obj:`str`, optional): Output directory. Note: While it is possible to set up a simulation using the basic constructor, no checks are performed whether the instances of :class:`.Param`, :class:`.Coordinates` and :class:`.State` are valid. It is safer to use :meth:`.from_geom` which will set up the necessary valid instances. The following attributes and methods are available: Attributes: state (:class:`.State`): State object holding density, velocity and sound speed. coords (:class:`.Coordinates`): Coordinates object holding x, y and z coordinates. param (:class:`.Param`): Param object holding simulation parameters. t (float): Current simulation time. direc (string): Output directory. """ def __init__(self, param, coords, state, t, direc='./'): self.state = State(state.dens, state.velx, state.vely, state.velz, state.soundspeed) self.coords = Coordinates(coords.x, coords.y, coords.z, log_radial=coords.log_radial) self.param = Param(param) self.t = t self.direc = direc
[docs] @classmethod def from_geom(cls, geometry, dimensions=(100, 1, 1), domain=([-0.5, 0.5], [], []), log_radial=False, direc='./'): """Construct simulation from geometry. Construct simulation class from geometry, grid dimensions and domain size. All other parameters will be set to defaults. Simulation time is set to zero. Args: geometry (string): 'cart' (Cartesian coordinates), 'sheet' (shearing sheet), 'cyl' (cylindrical coordinates) or 'sph' (spherical coordinates). dimensions (:obj:`(int,int,int)`,optional): Grid dimensions in x, y and z. domain (:obj:`[(float,float),(float,float),(float,float)]`,optional): Domain boundaries in x, y and z. log_radial (bool): Flag whether to use logarithmic radial coordinate direc (:obj:`str`,optional): Output directory, defaults to current directory. """ dt, param = Param.from_geom(geometry, log_radial).to_list() coords = Coordinates.from_dims(dimensions, domain, log_radial=log_radial) state = State.from_dims((len(coords.x[:,0,0]), len(coords.x[0,:,0]), len(coords.x[0,0,:]))) t = 0.0 return cls(param[0], coords, state, t, direc=direc)
[docs] @classmethod def from_checkpoint(cls, direc, n=None): """Constructor for simulation class from checkpoint. Construct simulation class from previously saved checkpoint. Args: direc (str): Path to 'rodeo.h5'. This will be the output directory as well. n (:obj:`int`,optional): Index of checkpoint to restore. If left None, restore last saved checkpoint. """ with h5py.File(direc + '/rodeo.h5', "r") as hf: # Restart from last checkpoint if n is None: g = hf.keys() n = len(g) - 3 gc = hf.get("coords") x = np.array(gc.get('x')) y = np.array(gc.get('y')) z = np.array(gc.get('z')) param = hf.get("param") g = hf.get("checkpoint{}".format(n)) dens = np.array(g.get('dens')) velx = np.array(g.get('velx')) vely = np.array(g.get('vely')) velz = np.array(g.get('velz')) soundspeed = np.array(g.get('soundspeed')) coords = Coordinates(x, y, z, log_radial=param[0]['log_radial']) state = State(dens, velx, vely, velz, soundspeed) t = g.attrs['time'] print("Restoring from checkpoint at t = {}".format(t)) return cls(param[0], coords, state, t, direc)
def __str__(self): """Information on simulation.""" return 'Simulation class, part of pyrodeo.\n \ Current time: {}\n'.format(self.t) + str(self.coords) + str(self.param)
[docs] def evolve(self, checkpoints, source_func = None, source_param = None, new_file=False): """Evolve simulation over a list of checkpoints. Args: checkpoints (ndarray): List of times to save checkpoints. source_func (:obj:`callable`, optional): Integrate extra source terms. Must be of the form f(t, dt, coords, state, source_param). source_param (:obj:`ndarray`, optional): Parameters for extra source term. Will be passed to `source_func`. new_file (:obj:`bool`,optional): If true, create new output file 'rodeo.h5', otherwise append to file if it exists. """ solver = Hydro(self.param, self.coords) if new_file is True: self.checkpoint(new_file=True) for tmax in checkpoints: if self.t < tmax: self.t, self.state = \ solver.evolve(self.t, tmax, self.coords, self.param, self.state, source_func, source_param) self.checkpoint() else: print("Skipping checkpoint t = {}".format(tmax))
[docs] def checkpoint(self, new_file=False): """Save checkpoint in 'rodeo.h5' Args: new_file (:obj:`bool`,optional): If true, create new output file 'rodeo.h5', otherwise append to file if it exists. """ mode = "a" if new_file is True: mode = "w" print("Checkpoint at t = {}".format(self.t)) with h5py.File(self.direc + '/rodeo.h5', mode) as hf: n = 0 # Write coordinates and parameters if new_file is True: gc = hf.create_group("coords") gc.create_dataset("x", data=self.coords.x) gc.create_dataset("y", data=self.coords.y) gc.create_dataset("z", data=self.coords.z) param_dtype, param = self.param.to_list() param_array = np.array(param, dtype=param_dtype) param_dset = \ hf.create_dataset("param", (len(param),), dtype=param_dtype) param_dset[...] = param_array else: # Number of previous saves + 1 g = hf.keys() n = len(g) - 2 # Write current data g1 = hf.create_group("checkpoint{}".format(n)) g1.attrs['time'] = self.t g1.create_dataset("dens", data=self.state.dens) g1.create_dataset('velx', data=self.state.velx) g1.create_dataset('vely', data=self.state.vely) g1.create_dataset('velz', data=self.state.velz) g1.create_dataset('soundspeed', data=self.state.soundspeed)